Skip to content

Commit 0cbf38e

Browse files
siri-varmaartur-ciocanusalaboy
committed
Add Failure Policy for Jobs SDK (dapr#1448)
* Update CONTRIBUTING.md Signed-off-by: Siri Varma Vegiraju <[email protected]> * Add failrue policy Signed-off-by: sirivarma <[email protected]> * Add tests Signed-off-by: sirivarma <[email protected]> * Add Tests Signed-off-by: sirivarma <[email protected]> * Upgrading to 1.15.7 (dapr#1458) * upgrading to 1.15.7 Signed-off-by: salaboy <[email protected]> * using DAPR VERSION Signed-off-by: salaboy <[email protected]> --------- Signed-off-by: salaboy <[email protected]> Signed-off-by: siri-varma <[email protected]> * Rename classes Signed-off-by: siri-varma <[email protected]> * add rc Signed-off-by: sirivarma <[email protected]> * fix checkstyle Signed-off-by: sirivarma <[email protected]> * Fix things Signed-off-by: sirivarma <[email protected]> * Test latest Signed-off-by: sirivarma <[email protected]> * fix checkstyle Signed-off-by: sirivarma <[email protected]> * Address comments Signed-off-by: sirivarma <[email protected]> * Address comments Signed-off-by: sirivarma <[email protected]> --------- Signed-off-by: Siri Varma Vegiraju <[email protected]> Signed-off-by: sirivarma <[email protected]> Signed-off-by: salaboy <[email protected]> Signed-off-by: siri-varma <[email protected]> Co-authored-by: artur-ciocanu <[email protected]> Co-authored-by: salaboy <[email protected]> Signed-off-by: siri-varma <[email protected]>
1 parent 1b63c76 commit 0cbf38e

File tree

9 files changed

+515
-1
lines changed

9 files changed

+515
-1
lines changed

sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprJobsIT.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414
package io.dapr.it.testcontainers;
1515

1616
import io.dapr.client.DaprPreviewClient;
17+
import io.dapr.client.domain.ConstantFailurePolicy;
1718
import io.dapr.client.domain.DeleteJobRequest;
19+
import io.dapr.client.domain.DropFailurePolicy;
20+
import io.dapr.client.domain.FailurePolicyType;
1821
import io.dapr.client.domain.GetJobRequest;
1922
import io.dapr.client.domain.GetJobResponse;
2023
import io.dapr.client.domain.JobSchedule;
@@ -24,6 +27,7 @@
2427
import org.junit.jupiter.api.BeforeEach;
2528
import org.junit.jupiter.api.Tag;
2629
import org.junit.jupiter.api.Test;
30+
import org.junit.runner.notification.Failure;
2731
import org.springframework.beans.factory.annotation.Autowired;
2832
import org.springframework.boot.test.context.SpringBootTest;
2933
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
@@ -33,6 +37,7 @@
3337
import org.testcontainers.junit.jupiter.Container;
3438
import org.testcontainers.junit.jupiter.Testcontainers;
3539

40+
import java.time.Duration;
3641
import java.time.Instant;
3742
import java.time.ZoneOffset;
3843
import java.time.format.DateTimeFormatter;
@@ -96,6 +101,9 @@ public void testJobScheduleCreationWithDueTime() {
96101

97102
GetJobResponse getJobResponse =
98103
daprPreviewClient.getJob(new GetJobRequest("Job")).block();
104+
105+
daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();
106+
99107
assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString());
100108
assertEquals("Job", getJobResponse.getName());
101109
}
@@ -111,6 +119,9 @@ public void testJobScheduleCreationWithSchedule() {
111119

112120
GetJobResponse getJobResponse =
113121
daprPreviewClient.getJob(new GetJobRequest("Job")).block();
122+
123+
daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();
124+
114125
assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString());
115126
assertEquals(JobSchedule.hourly().getExpression(), getJobResponse.getSchedule().getExpression());
116127
assertEquals("Job", getJobResponse.getName());
@@ -132,6 +143,9 @@ public void testJobScheduleCreationWithAllParameters() {
132143

133144
GetJobResponse getJobResponse =
134145
daprPreviewClient.getJob(new GetJobRequest("Job")).block();
146+
147+
daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();
148+
135149
assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString());
136150
assertEquals("2 * 3 * * FRI", getJobResponse.getSchedule().getExpression());
137151
assertEquals("Job", getJobResponse.getName());
@@ -141,6 +155,57 @@ public void testJobScheduleCreationWithAllParameters() {
141155
getJobResponse.getTtl().toString());
142156
}
143157

158+
@Test
159+
public void testJobScheduleCreationWithDropFailurePolicy() {
160+
Instant currentTime = Instant.now();
161+
DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
162+
.withZone(ZoneOffset.UTC);
163+
164+
String cronExpression = "2 * 3 * * FRI";
165+
166+
daprPreviewClient.scheduleJob(new ScheduleJobRequest("Job", currentTime)
167+
.setTtl(currentTime.plus(2, ChronoUnit.HOURS))
168+
.setData("Job data".getBytes())
169+
.setRepeat(3)
170+
.setFailurePolicy(new DropFailurePolicy())
171+
.setSchedule(JobSchedule.fromString(cronExpression))).block();
172+
173+
GetJobResponse getJobResponse =
174+
daprPreviewClient.getJob(new GetJobRequest("Job")).block();
175+
176+
daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();
177+
178+
assertEquals(FailurePolicyType.DROP, getJobResponse.getFailurePolicy().getFailurePolicyType());
179+
}
180+
181+
@Test
182+
public void testJobScheduleCreationWithConstantFailurePolicy() {
183+
Instant currentTime = Instant.now();
184+
DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
185+
.withZone(ZoneOffset.UTC);
186+
187+
String cronExpression = "2 * 3 * * FRI";
188+
189+
daprPreviewClient.scheduleJob(new ScheduleJobRequest("Job", currentTime)
190+
.setTtl(currentTime.plus(2, ChronoUnit.HOURS))
191+
.setData("Job data".getBytes())
192+
.setRepeat(3)
193+
.setFailurePolicy(new ConstantFailurePolicy(3)
194+
.setDurationBetweenRetries(Duration.of(10, ChronoUnit.SECONDS)))
195+
.setSchedule(JobSchedule.fromString(cronExpression))).block();
196+
197+
GetJobResponse getJobResponse =
198+
daprPreviewClient.getJob(new GetJobRequest("Job")).block();
199+
200+
daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();
201+
202+
ConstantFailurePolicy jobFailurePolicyConstant = (ConstantFailurePolicy) getJobResponse.getFailurePolicy();
203+
assertEquals(FailurePolicyType.CONSTANT, getJobResponse.getFailurePolicy().getFailurePolicyType());
204+
assertEquals(3, (int)jobFailurePolicyConstant.getMaxRetries());
205+
assertEquals(Duration.of(10, ChronoUnit.SECONDS).getNano(),
206+
jobFailurePolicyConstant.getDurationBetweenRetries().getNano());
207+
}
208+
144209
@Test
145210
public void testDeleteJobRequest() {
146211
Instant currentTime = Instant.now();

sdk/src/main/java/io/dapr/client/DaprClientImpl.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,18 @@
2727
import io.dapr.client.domain.CloudEvent;
2828
import io.dapr.client.domain.ComponentMetadata;
2929
import io.dapr.client.domain.ConfigurationItem;
30+
import io.dapr.client.domain.ConstantFailurePolicy;
3031
import io.dapr.client.domain.ConversationInput;
3132
import io.dapr.client.domain.ConversationOutput;
3233
import io.dapr.client.domain.ConversationRequest;
3334
import io.dapr.client.domain.ConversationResponse;
3435
import io.dapr.client.domain.DaprMetadata;
3536
import io.dapr.client.domain.DeleteJobRequest;
3637
import io.dapr.client.domain.DeleteStateRequest;
38+
import io.dapr.client.domain.DropFailurePolicy;
3739
import io.dapr.client.domain.ExecuteStateTransactionRequest;
40+
import io.dapr.client.domain.FailurePolicy;
41+
import io.dapr.client.domain.FailurePolicyType;
3842
import io.dapr.client.domain.GetBulkSecretRequest;
3943
import io.dapr.client.domain.GetBulkStateRequest;
4044
import io.dapr.client.domain.GetConfigurationRequest;
@@ -105,6 +109,7 @@
105109
import java.time.Instant;
106110
import java.time.ZoneOffset;
107111
import java.time.format.DateTimeFormatter;
112+
import java.time.temporal.ChronoUnit;
108113
import java.util.ArrayList;
109114
import java.util.Arrays;
110115
import java.util.Collections;
@@ -1340,6 +1345,11 @@ public Mono<Void> scheduleJob(ScheduleJobRequest scheduleJobRequest) {
13401345
jobBuilder.setFailurePolicy(getJobFailurePolicy(scheduleJobRequest.getFailurePolicy()));
13411346
}
13421347

1348+
if (scheduleJobRequest.getFailurePolicy() != null) {
1349+
scheduleJobRequestBuilder.setFailurePolicy(getJobFailurePolicy(scheduleJobRequest.getFailurePolicy()));
1350+
}
1351+
1352+
scheduleJobRequestBuilder.setOverwrite(scheduleJobRequest.getOverwrite());
13431353

13441354
Mono<DaprProtos.ScheduleJobResponse> scheduleJobResponseMono =
13451355
Mono.deferContextual(context -> this.createMono(
@@ -1384,6 +1394,10 @@ public Mono<GetJobResponse> getJob(GetJobRequest getJobRequest) {
13841394
getJobResponse = new GetJobResponse(job.getName(), Instant.parse(job.getDueTime()));
13851395
}
13861396

1397+
if (job.hasFailurePolicy()) {
1398+
getJobResponse.setFailurePolicy(getJobFailurePolicy(job.getFailurePolicy()));
1399+
}
1400+
13871401
return getJobResponse
13881402
.setTtl(job.hasTtl() ? Instant.parse(job.getTtl()) : null)
13891403
.setData(job.hasData() ? job.getData().getValue().toByteArray() : null)
@@ -1394,6 +1408,53 @@ public Mono<GetJobResponse> getJob(GetJobRequest getJobRequest) {
13941408
}
13951409
}
13961410

1411+
private FailurePolicy getJobFailurePolicy(CommonProtos.JobFailurePolicy jobFailurePolicy) {
1412+
if (jobFailurePolicy.hasDrop()) {
1413+
return new DropFailurePolicy();
1414+
}
1415+
1416+
CommonProtos.JobFailurePolicyConstant jobFailurePolicyConstant = jobFailurePolicy.getConstant();
1417+
if (jobFailurePolicyConstant.hasInterval() && jobFailurePolicyConstant.hasMaxRetries()) {
1418+
return new ConstantFailurePolicy(jobFailurePolicyConstant.getMaxRetries())
1419+
.setDurationBetweenRetries(Duration.of(jobFailurePolicyConstant.getInterval().getNanos(),
1420+
ChronoUnit.NANOS));
1421+
}
1422+
1423+
if (jobFailurePolicyConstant.hasMaxRetries()) {
1424+
return new ConstantFailurePolicy(jobFailurePolicyConstant.getMaxRetries());
1425+
}
1426+
1427+
return new ConstantFailurePolicy(
1428+
Duration.of(jobFailurePolicyConstant.getInterval().getNanos(),
1429+
ChronoUnit.NANOS));
1430+
}
1431+
1432+
private CommonProtos.JobFailurePolicy getJobFailurePolicy(FailurePolicy failurePolicy) {
1433+
CommonProtos.JobFailurePolicy.Builder jobFailurePolicyBuilder = CommonProtos.JobFailurePolicy.newBuilder();
1434+
1435+
if (failurePolicy.getFailurePolicyType() == FailurePolicyType.DROP) {
1436+
jobFailurePolicyBuilder.setDrop(CommonProtos.JobFailurePolicyDrop.newBuilder().build());
1437+
return jobFailurePolicyBuilder.build();
1438+
}
1439+
1440+
CommonProtos.JobFailurePolicyConstant.Builder constantPolicyBuilder =
1441+
CommonProtos.JobFailurePolicyConstant.newBuilder();
1442+
ConstantFailurePolicy jobConstantFailurePolicy = (ConstantFailurePolicy)failurePolicy;
1443+
1444+
if (jobConstantFailurePolicy.getMaxRetries() != null) {
1445+
constantPolicyBuilder.setMaxRetries(jobConstantFailurePolicy.getMaxRetries());
1446+
}
1447+
1448+
if (jobConstantFailurePolicy.getDurationBetweenRetries() != null) {
1449+
constantPolicyBuilder.setInterval(com.google.protobuf.Duration.newBuilder()
1450+
.setNanos(jobConstantFailurePolicy.getDurationBetweenRetries().getNano()).build());
1451+
}
1452+
1453+
jobFailurePolicyBuilder.setConstant(constantPolicyBuilder.build());
1454+
1455+
return jobFailurePolicyBuilder.build();
1456+
}
1457+
13971458
/**
13981459
* {@inheritDoc}
13991460
*/
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright 2021 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.client.domain;
15+
16+
import java.time.Duration;
17+
18+
/**
19+
* A failure policy that applies a constant retry interval for job retries.
20+
* This implementation of {@link FailurePolicy} retries a job a fixed number of times
21+
* with a constant delay between each retry attempt.
22+
*/
23+
public class ConstantFailurePolicy implements FailurePolicy {
24+
25+
private Integer maxRetries;
26+
private Duration durationBetweenRetries;
27+
28+
/**
29+
* Constructs a {@code JobConstantFailurePolicy} with the specified maximum number of retries.
30+
*
31+
* @param maxRetries the maximum number of retries
32+
*/
33+
public ConstantFailurePolicy(Integer maxRetries) {
34+
this.maxRetries = maxRetries;
35+
}
36+
37+
/**
38+
* Constructs a {@code JobConstantFailurePolicy} with the specified duration between retries.
39+
*
40+
* @param durationBetweenRetries the duration to wait between retries
41+
*/
42+
public ConstantFailurePolicy(Duration durationBetweenRetries) {
43+
this.durationBetweenRetries = durationBetweenRetries;
44+
}
45+
46+
/**
47+
* Sets the duration to wait between retry attempts.
48+
*
49+
* @param durationBetweenRetries the duration between retries
50+
* @return a {@code JobFailurePolicyConstant}.
51+
*/
52+
public ConstantFailurePolicy setDurationBetweenRetries(Duration durationBetweenRetries) {
53+
this.durationBetweenRetries = durationBetweenRetries;
54+
return this;
55+
}
56+
57+
/**
58+
* Sets the maximum number of retries allowed.
59+
*
60+
* @param maxRetries the number of retries
61+
* @return a {@code JobFailurePolicyConstant}.
62+
*/
63+
public ConstantFailurePolicy setMaxRetries(int maxRetries) {
64+
this.maxRetries = maxRetries;
65+
return this;
66+
}
67+
68+
/**
69+
* Returns the configured duration between retry attempts.
70+
*
71+
* @return the duration between retries
72+
*/
73+
public Duration getDurationBetweenRetries() {
74+
return this.durationBetweenRetries;
75+
}
76+
77+
/**
78+
* Returns the configured maximum number of retries.
79+
*
80+
* @return the maximum number of retries
81+
*/
82+
public Integer getMaxRetries() {
83+
return this.maxRetries;
84+
}
85+
86+
/**
87+
* Returns the type of failure policy.
88+
*
89+
* @return {@link FailurePolicyType#CONSTANT}
90+
*/
91+
@Override
92+
public FailurePolicyType getFailurePolicyType() {
93+
return FailurePolicyType.CONSTANT;
94+
}
95+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2021 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.client.domain;
15+
16+
/**
17+
* A failure policy that drops the job upon failure without retrying.
18+
* This implementation of {@link FailurePolicy} immediately discards failed jobs
19+
* instead of retrying them.
20+
*/
21+
public class DropFailurePolicy implements FailurePolicy {
22+
23+
/**
24+
* Returns the type of failure policy.
25+
*
26+
* @return {@link FailurePolicyType#DROP}
27+
*/
28+
@Override
29+
public FailurePolicyType getFailurePolicyType() {
30+
return FailurePolicyType.DROP;
31+
}
32+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.client.domain;
15+
16+
/**
17+
* Set a failure policy for the job.
18+
*/
19+
public interface FailurePolicy {
20+
FailurePolicyType getFailurePolicyType();
21+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.client.domain;
15+
16+
public enum FailurePolicyType {
17+
DROP,
18+
19+
CONSTANT
20+
}

0 commit comments

Comments
 (0)