Skip to content

Commit ee38474

Browse files
authored
ci: fix TPC-H benchmark workflows (#5123)
## Changes Made Fixes the UV setup in the benchmark workflows so that they work with our new dependency setup. I was having issues with running a Daft script as a Ray job and it turns out that we broke the behavior of automatically connecting to the Ray cluster when running in a Ray job in #4567. The `RAY_ADDRESS` env var is set to `<local ip>:6379` in a job script, but in that PR we prepended `ray://` to it, which causes `ray.init` to try and fail to create a Ray client instead of connecting to the cluster. I've removed that logic, as well as the logic in Daft to read `RAY_ADDRESS` because `ray.init` does that automatically already. ## Related Issues <!-- Link to related GitHub issues, e.g., "Closes #123" --> ## Checklist - [x] Documented in API Docs (if applicable) - [x] Documented in User Guide (if applicable) - [x] If adding a new documentation page, doc is added to `docs/mkdocs.yml` navigation - [x] Documentation builds and is formatted properly (tag @/ccmao1130 for docs review)
1 parent bc9d370 commit ee38474

File tree

6 files changed

+37
-41
lines changed

6 files changed

+37
-41
lines changed

.github/ci-scripts/distributed_tpch.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import os
77
import sys
88
import time
9-
import urllib.request
109

1110
from ray.job_submission import JobDetails, JobStatus, JobSubmissionClient
1211

@@ -39,13 +38,10 @@ def run_benchmark():
3938

4039
client = JobSubmissionClient(address="http://localhost:8265")
4140

42-
# Create the directory if it doesn't exist
43-
os.makedirs("traces", exist_ok=True)
44-
4541
for q in range(1, 23):
4642
print(f"Running TPC-H Q{q}... ", end="", flush=True)
4743

48-
start = time.perf_counter()
44+
start: float = time.perf_counter()
4945

5046
submission_id = client.submit_job(
5147
entrypoint=f"DAFT_RUNNER=ray DAFT_PROGRESS_BAR=0 python answers_sql.py {parquet_path} {q}",
@@ -56,10 +52,6 @@ def run_benchmark():
5652

5753
end = time.perf_counter()
5854

59-
# Download the trace file
60-
trace_url = f"http://localhost:8265/api/v0/tasks/timeline?download=1&job_id={job_details.job_id}"
61-
urllib.request.urlretrieve(trace_url, f"traces/q{q}-trace.json")
62-
6355
if job_details.status != JobStatus.SUCCEEDED:
6456
print(f"\nRay job did not succeed, received job status: {job_details.status}\nJob details: {job_details}")
6557
sys.exit(1)

.github/workflows/benchmark-distributed-tpch.yml

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,13 @@ jobs:
7575
uses: astral-sh/setup-uv@v6
7676
with:
7777
python-version: ${{ env.PYTHON_VERSION }}
78-
version: "0.6.17"
79-
- name: Install Daft and dev dependencies
78+
enable-cache: true
79+
cache-dependency-glob: "**/pyproject.toml"
80+
- name: Setup Virtual Env
8081
run: |
82+
uv venv --seed .venv
83+
echo "$GITHUB_WORKSPACE/.venv/bin" >> $GITHUB_PATH
84+
source .venv/bin/activate
8185
rm -rf daft
8286
uv pip install daft --pre --extra-index-url ${{ env.DAFT_INDEX_URL }}
8387
uv pip install gspread ray[default] boto3
@@ -98,18 +102,17 @@ jobs:
98102
chmod 600 ~/.ssh/ci-github-actions-ray-cluster-key.pem
99103
- name: Spin up and connect to ray cluster
100104
run: |
105+
source .venv/bin/activate
101106
ray up ray.yaml -y
102107
ray dashboard ray.yaml &
103108
104109
- name: Run benchmark and upload results to Google Sheets
105-
run: PYTHONPATH=. python .github/ci-scripts/distributed_tpch.py
106-
107-
- name: Upload traces
108-
uses: actions/upload-artifact@v4
109-
with:
110-
name: traces
111-
path: traces
110+
run: |
111+
source .venv/bin/activate
112+
PYTHONPATH=. python .github/ci-scripts/distributed_tpch.py
112113
113114
- name: Spin down ray cluster
114115
if: always()
115-
run: ray down ray.yaml -y
116+
run: |
117+
source .venv/bin/activate
118+
ray down ray.yaml -y

.github/workflows/benchmark-local-tpch.yml

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,17 +58,23 @@ jobs:
5858
uses: astral-sh/setup-uv@v6
5959
with:
6060
python-version: ${{ env.PYTHON_VERSION }}
61-
version: "0.6.17"
62-
- name: Install Daft and dev dependencies
61+
enable-cache: true
62+
cache-dependency-glob: "**/pyproject.toml"
63+
- name: Setup Virtual Env
6364
run: |
65+
uv venv --seed .venv
66+
echo "$GITHUB_WORKSPACE/.venv/bin" >> $GITHUB_PATH
67+
source .venv/bin/activate
6468
rm -rf daft
6569
uv pip install daft --pre --extra-index-url ${{ env.DAFT_INDEX_URL }}
66-
uv pip install gspread
70+
uv pip install gspread ray[default] boto3
6771
- name: Write service account secret file
6872
run: |
6973
mkdir -p ~/.config/gspread
7074
cat << EOF > ~/.config/gspread/service_account.json
7175
${{ secrets.GOOGLE_SHEETS_SERVICE_ACCOUNT }}
7276
EOF
7377
- name: Run benchmark and upload results to Google Sheets
74-
run: PYTHONPATH=. DAFT_RUNNER=native python .github/ci-scripts/local_tpch.py
78+
run: |
79+
source .venv/bin/activate
80+
PYTHONPATH=. DAFT_RUNNER=native python .github/ci-scripts/local_tpch.py

benchmarking/tpch/answers_sql.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ def get_answer(q: int, get_df) -> daft.DataFrame:
8080

8181

8282
def main(parquet_path, q):
83+
if q in (11, 22):
84+
# TODO: remove this once we support cross joins in Flotilla
85+
daft.set_execution_config(use_legacy_ray_runner=True)
86+
8387
s3_config_from_env = S3Config.from_env()
8488
io_config = IOConfig(s3=s3_config_from_env)
8589

daft/runners/ray_runner.py

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import threading
88
import time
99
import uuid
10-
import warnings
1110
from collections.abc import Generator, Iterable, Iterator
1211
from datetime import datetime
1312
from queue import Full, Queue
@@ -268,7 +267,7 @@ def _to_pandas_ref(df: pd.DataFrame | ray.ObjectRef) -> ray.ObjectRef:
268267
elif isinstance(df, ray.ObjectRef):
269268
return df
270269
else:
271-
raise ValueError("Expected a Ray object ref or a Pandas DataFrame, " f"got {type(df)}")
270+
raise ValueError(f"Expected a Ray object ref or a Pandas DataFrame, got {type(df)}")
272271

273272

274273
class RayPartitionSet(PartitionSet[ray.ObjectRef]):
@@ -887,8 +886,7 @@ def _run_plan(
887886

888887
start = datetime.now()
889888
profile_filename = (
890-
f"profile_RayRunner.run()_"
891-
f"{datetime.replace(datetime.now(), second=0, microsecond=0).isoformat()[:-3]}.json"
889+
f"profile_RayRunner.run()_{datetime.replace(datetime.now(), second=0, microsecond=0).isoformat()[:-3]}.json"
892890
)
893891

894892
with profiler(profile_filename), ray_tracing.ray_tracer(result_uuid, daft_execution_config) as runner_tracer:
@@ -1234,16 +1232,11 @@ def __init__(
12341232
address,
12351233
)
12361234
else:
1237-
if address is not None:
1238-
if not address.endswith("10001"):
1239-
warnings.warn(
1240-
f"The address to a Ray client server is typically at port :10001, but instead we found: {address}"
1241-
)
1242-
if not address.startswith("ray://"):
1243-
warnings.warn(
1244-
f"Expected Ray address to start with 'ray://' protocol but found: {address}. Automatically prefixing your address with the protocol to make a Ray connection: ray://{address}"
1245-
)
1246-
address = "ray://" + address
1235+
if address is not None and address.startswith("ray://"):
1236+
logger.warning(
1237+
"Specifying a Ray address with the 'ray://' prefix uses the Ray Client, which may impact performance. If this is running in a Ray job, you may not need to specify the address at all."
1238+
)
1239+
12471240
ray.init(address=address)
12481241

12491242
# Check if Ray is running in "client mode" (connected to a Ray cluster via a Ray client)

src/daft-context/src/lib.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -312,19 +312,17 @@ fn parse_usize_env_var(var_name: &str) -> Option<usize> {
312312
#[cfg(feature = "python")]
313313
fn get_ray_runner_config_from_env() -> RunnerConfig {
314314
const DAFT_RAY_ADDRESS: &str = "DAFT_RAY_ADDRESS";
315-
const RAY_ADDRESS: &str = "RAY_ADDRESS";
316315
const DAFT_DEVELOPER_RAY_MAX_TASK_BACKLOG: &str = "DAFT_DEVELOPER_RAY_MAX_TASK_BACKLOG";
317316
const DAFT_RAY_FORCE_CLIENT_MODE: &str = "DAFT_RAY_FORCE_CLIENT_MODE";
318317

319318
let address = if let Ok(address) = std::env::var(DAFT_RAY_ADDRESS) {
320319
log::warn!(
321-
"Detected usage of the ${} environment variable. This will be deprecated, please use ${} instead.",
320+
"Detected usage of the ${} environment variable. This will be deprecated, please use $RAY_ADDRESS instead.",
322321
DAFT_RAY_ADDRESS,
323-
RAY_ADDRESS
324322
);
325323
Some(address)
326324
} else {
327-
std::env::var(RAY_ADDRESS).ok()
325+
None
328326
};
329327
let max_task_backlog = parse_usize_env_var(DAFT_DEVELOPER_RAY_MAX_TASK_BACKLOG);
330328
let force_client_mode = parse_bool_env_var(DAFT_RAY_FORCE_CLIENT_MODE);

0 commit comments

Comments
 (0)