Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/robfig/cron"
kbatch "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ref "k8s.io/client-go/tools/reference"
Expand Down Expand Up @@ -68,6 +70,16 @@ type Clock interface {

// +kubebuilder:docs-gen:collapse=Clock

// Definitions to manage status conditions
const (
// typeAvailableCronJob represents the status of the CronJob reconciliation
typeAvailableCronJob = "Available"
// typeProgressingCronJob represents the status used when the CronJob is being reconciled
typeProgressingCronJob = "Progressing"
// typeDegradedCronJob represents the status used when the CronJob has encountered an error
typeDegradedCronJob = "Degraded"
)

/*
Notice that we need a few more RBAC permissions -- since we're creating and
managing jobs now, we'll need permissions for those, which means adding
Expand Down Expand Up @@ -114,11 +126,35 @@ func (r *CronJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
*/
var cronJob batchv1.CronJob
if err := r.Get(ctx, req.NamespacedName, &cronJob); err != nil {
log.Error(err, "unable to fetch CronJob")
// we'll ignore not-found errors, since they can't be fixed by an immediate
// requeue (we'll need to wait for a new notification), and we can get them
// on deleted requests.
return ctrl.Result{}, client.IgnoreNotFound(err)
if apierrors.IsNotFound(err) {
// If the custom resource is not found then it usually means that it was deleted or not created
// In this way, we will stop the reconciliation
log.Info("CronJob resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
log.Error(err, "Failed to get CronJob")
return ctrl.Result{}, err
}

// Initialize status conditions if not yet present
if len(cronJob.Status.Conditions) == 0 {
meta.SetStatusCondition(&cronJob.Status.Conditions, metav1.Condition{
Type: typeProgressingCronJob,
Status: metav1.ConditionUnknown,
Reason: "Reconciling",
Message: "Starting reconciliation",
})
if err := r.Status().Update(ctx, &cronJob); err != nil {
log.Error(err, "Failed to update CronJob status")
return ctrl.Result{}, err
}

// Re-fetch the CronJob after updating the status
if err := r.Get(ctx, req.NamespacedName, &cronJob); err != nil {
log.Error(err, "Failed to re-fetch CronJob")
return ctrl.Result{}, err
}
}

/*
Expand All @@ -131,6 +167,16 @@ func (r *CronJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
var childJobs kbatch.JobList
if err := r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingFields{jobOwnerKey: req.Name}); err != nil {
log.Error(err, "unable to list child Jobs")
// Update status condition to reflect the error
meta.SetStatusCondition(&cronJob.Status.Conditions, metav1.Condition{
Type: typeDegradedCronJob,
Status: metav1.ConditionTrue,
Reason: "ReconciliationError",
Message: fmt.Sprintf("Failed to list child jobs: %v", err),
})
if statusErr := r.Status().Update(ctx, &cronJob); statusErr != nil {
log.Error(statusErr, "Failed to update CronJob status")
}
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -247,6 +293,58 @@ func (r *CronJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
*/
log.V(1).Info("job count", "active jobs", len(activeJobs), "successful jobs", len(successfulJobs), "failed jobs", len(failedJobs))

// Check if CronJob is suspended
isSuspended := cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend

// Update status conditions based on current state
if isSuspended {
meta.SetStatusCondition(&cronJob.Status.Conditions, metav1.Condition{
Type: typeAvailableCronJob,
Status: metav1.ConditionFalse,
Reason: "Suspended",
Message: "CronJob is suspended",
})
} else if len(failedJobs) > 0 {
meta.SetStatusCondition(&cronJob.Status.Conditions, metav1.Condition{
Type: typeDegradedCronJob,
Status: metav1.ConditionTrue,
Reason: "JobsFailed",
Message: fmt.Sprintf("%d job(s) have failed", len(failedJobs)),
})
meta.SetStatusCondition(&cronJob.Status.Conditions, metav1.Condition{
Type: typeAvailableCronJob,
Status: metav1.ConditionFalse,
Reason: "JobsFailed",
Message: fmt.Sprintf("%d job(s) have failed", len(failedJobs)),
})
} else if len(activeJobs) > 0 {
meta.SetStatusCondition(&cronJob.Status.Conditions, metav1.Condition{
Type: typeProgressingCronJob,
Status: metav1.ConditionTrue,
Reason: "JobsActive",
Message: fmt.Sprintf("%d job(s) are currently active", len(activeJobs)),
})
meta.SetStatusCondition(&cronJob.Status.Conditions, metav1.Condition{
Type: typeAvailableCronJob,
Status: metav1.ConditionTrue,
Reason: "JobsActive",
Message: fmt.Sprintf("CronJob is progressing with %d active job(s)", len(activeJobs)),
})
} else {
meta.SetStatusCondition(&cronJob.Status.Conditions, metav1.Condition{
Type: typeAvailableCronJob,
Status: metav1.ConditionTrue,
Reason: "AllJobsCompleted",
Message: "All jobs have completed successfully",
})
meta.SetStatusCondition(&cronJob.Status.Conditions, metav1.Condition{
Type: typeProgressingCronJob,
Status: metav1.ConditionFalse,
Reason: "NoJobsActive",
Message: "No jobs are currently active",
})
}

/*
Using the data we've gathered, we'll update the status of our CRD.
Just like before, we use our client. To specifically update the status
Expand Down Expand Up @@ -400,6 +498,16 @@ func (r *CronJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
missedRun, nextRun, err := getNextSchedule(&cronJob, r.Now())
if err != nil {
log.Error(err, "unable to figure out CronJob schedule")
// Update status condition to reflect the schedule error
meta.SetStatusCondition(&cronJob.Status.Conditions, metav1.Condition{
Type: typeDegradedCronJob,
Status: metav1.ConditionTrue,
Reason: "InvalidSchedule",
Message: fmt.Sprintf("Failed to parse schedule: %v", err),
})
if statusErr := r.Status().Update(ctx, &cronJob); statusErr != nil {
log.Error(statusErr, "Failed to update CronJob status")
}
// we don't really care about requeuing until we get an update that
// fixes the schedule, so don't return an error
return ctrl.Result{}, nil
Expand Down Expand Up @@ -430,7 +538,16 @@ func (r *CronJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}
if tooLate {
log.V(1).Info("missed starting deadline for last run, sleeping till next")
// TODO(directxman12): events
// Update status condition to reflect missed deadline
meta.SetStatusCondition(&cronJob.Status.Conditions, metav1.Condition{
Type: typeDegradedCronJob,
Status: metav1.ConditionTrue,
Reason: "MissedSchedule",
Message: fmt.Sprintf("Missed starting deadline for run at %v", missedRun),
})
if statusErr := r.Status().Update(ctx, &cronJob); statusErr != nil {
log.Error(statusErr, "Failed to update CronJob status")
}
return scheduledResult, nil
}

Expand Down Expand Up @@ -511,11 +628,32 @@ func (r *CronJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
// ...and create it on the cluster
if err := r.Create(ctx, job); err != nil {
log.Error(err, "unable to create Job for CronJob", "job", job)
// Update status condition to reflect the error
meta.SetStatusCondition(&cronJob.Status.Conditions, metav1.Condition{
Type: typeDegradedCronJob,
Status: metav1.ConditionTrue,
Reason: "JobCreationFailed",
Message: fmt.Sprintf("Failed to create job: %v", err),
})
if statusErr := r.Status().Update(ctx, &cronJob); statusErr != nil {
log.Error(statusErr, "Failed to update CronJob status")
}
return ctrl.Result{}, err
}

log.V(1).Info("created Job for CronJob run", "job", job)

// Update status condition to reflect successful job creation
meta.SetStatusCondition(&cronJob.Status.Conditions, metav1.Condition{
Type: typeProgressingCronJob,
Status: metav1.ConditionTrue,
Reason: "JobCreated",
Message: fmt.Sprintf("Created job %s", job.Name),
})
if statusErr := r.Status().Update(ctx, &cronJob); statusErr != nil {
log.Error(statusErr, "Failed to update CronJob status")
}

/*
### 7: Requeue when we either see a running job or it's time for the next scheduled run

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ import (

// +kubebuilder:docs-gen:collapse=Imports

// Helper function to check if a specific condition exists with expected status
func hasCondition(conditions []metav1.Condition, conditionType string, expectedStatus metav1.ConditionStatus) bool {
for _, condition := range conditions {
if condition.Type == conditionType && condition.Status == expectedStatus {
return true
}
}
return false
}

/*
The first step to writing a simple integration test is to actually create an instance of CronJob you can run tests against.
Note that to create a CronJob, you’ll need to create a stub CronJob struct that contains your CronJob’s specifications.
Expand Down Expand Up @@ -185,6 +195,14 @@ var _ = Describe("CronJob controller", func() {
g.Expect(createdCronjob.Status.Active).To(HaveLen(1), "should have exactly one active job")
g.Expect(createdCronjob.Status.Active[0].Name).To(Equal(JobName), "the wrong job is active")
}, timeout, interval).Should(Succeed(), "should list our active job %s in the active jobs list in status", JobName)

By("By checking that the CronJob status conditions are properly set")
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, cronjobLookupKey, createdCronjob)).To(Succeed())
// Check that the Available condition is set to True when job is active
g.Expect(hasCondition(createdCronjob.Status.Conditions, "Available", metav1.ConditionTrue)).To(BeTrue(),
"CronJob should have Available condition set to True")
}, timeout, interval).Should(Succeed())
})
})

Expand Down
Loading