Lines 141-146
typedef struct {
Link Here
|
141 |
pthread_mutex_t *timer_mutex; |
141 |
pthread_mutex_t *timer_mutex; |
142 |
} timer_struct_t; |
142 |
} timer_struct_t; |
143 |
|
143 |
|
|
|
144 |
typedef struct { |
145 |
bool batch_step; |
146 |
uint32_t job_id; |
147 |
} active_job_t; |
148 |
|
144 |
static void _fb_rdlock(void); |
149 |
static void _fb_rdlock(void); |
145 |
static void _fb_rdunlock(void); |
150 |
static void _fb_rdunlock(void); |
146 |
static void _delay_rpc(int host_inx, int host_cnt, int usec_per_rpc); |
151 |
static void _delay_rpc(int host_inx, int host_cnt, int usec_per_rpc); |
Lines 150-161
static int _job_limits_match(void *x, void *key);
Link Here
|
150 |
static bool _job_still_running(uint32_t job_id); |
155 |
static bool _job_still_running(uint32_t job_id); |
151 |
static int _kill_all_active_steps(uint32_t jobid, int sig, |
156 |
static int _kill_all_active_steps(uint32_t jobid, int sig, |
152 |
int flags, bool batch, uid_t req_uid); |
157 |
int flags, bool batch, uid_t req_uid); |
153 |
static void _launch_complete_add(uint32_t job_id); |
158 |
static void _launch_complete_add(uint32_t job_id, bool btch_step); |
154 |
static void _launch_complete_log(char *type, uint32_t job_id); |
159 |
static void _launch_complete_log(char *type, uint32_t job_id); |
155 |
static void _launch_complete_rm(uint32_t job_id); |
160 |
static void _launch_complete_rm(uint32_t job_id); |
156 |
static void _launch_complete_wait(uint32_t job_id); |
161 |
static void _launch_complete_wait(uint32_t job_id); |
157 |
static int _launch_job_fail(uint32_t job_id, uint32_t slurm_rc); |
162 |
static int _launch_job_fail(uint32_t job_id, uint32_t slurm_rc); |
158 |
static bool _launch_job_test(uint32_t job_id); |
163 |
static bool _launch_job_test(uint32_t job_id, bool batch_step); |
159 |
static void _note_batch_job_finished(uint32_t job_id); |
164 |
static void _note_batch_job_finished(uint32_t job_id); |
160 |
static int _prolog_is_running (uint32_t jobid); |
165 |
static int _prolog_is_running (uint32_t jobid); |
161 |
static int _step_limits_match(void *x, void *key); |
166 |
static int _step_limits_match(void *x, void *key); |
Lines 251-257
static int job_suspend_size = 0;
Link Here
|
251 |
#define JOB_STATE_CNT 64 |
256 |
#define JOB_STATE_CNT 64 |
252 |
static pthread_mutex_t job_state_mutex = PTHREAD_MUTEX_INITIALIZER; |
257 |
static pthread_mutex_t job_state_mutex = PTHREAD_MUTEX_INITIALIZER; |
253 |
static pthread_cond_t job_state_cond = PTHREAD_COND_INITIALIZER; |
258 |
static pthread_cond_t job_state_cond = PTHREAD_COND_INITIALIZER; |
254 |
static uint32_t active_job_id[JOB_STATE_CNT] = {0}; |
259 |
static active_job_t active_job_id[JOB_STATE_CNT] = {0}; |
255 |
|
260 |
|
256 |
static pthread_mutex_t prolog_mutex = PTHREAD_MUTEX_INITIALIZER; |
261 |
static pthread_mutex_t prolog_mutex = PTHREAD_MUTEX_INITIALIZER; |
257 |
static pthread_mutex_t prolog_serial_mutex = PTHREAD_MUTEX_INITIALIZER; |
262 |
static pthread_mutex_t prolog_serial_mutex = PTHREAD_MUTEX_INITIALIZER; |
Lines 1610-1616
_rpc_launch_tasks(slurm_msg_t *msg)
Link Here
|
1610 |
step_hset, msg->protocol_version); |
1615 |
step_hset, msg->protocol_version); |
1611 |
debug3("%s: return from _forkexec_slurmstepd", __func__); |
1616 |
debug3("%s: return from _forkexec_slurmstepd", __func__); |
1612 |
|
1617 |
|
1613 |
_launch_complete_add(req->step_id.job_id); |
1618 |
_launch_complete_add(req->step_id.job_id, false); |
1614 |
|
1619 |
|
1615 |
done: |
1620 |
done: |
1616 |
if (step_hset) |
1621 |
if (step_hset) |
Lines 2279-2285
static void _rpc_batch_job(slurm_msg_t *msg)
Link Here
|
2279 |
goto done; |
2284 |
goto done; |
2280 |
} |
2285 |
} |
2281 |
|
2286 |
|
2282 |
if (_launch_job_test(req->job_id)) { |
2287 |
if (_launch_job_test(req->job_id, true)) { |
2283 |
error("Job %u already running, do not launch second copy", |
2288 |
error("Job %u already running, do not launch second copy", |
2284 |
req->job_id); |
2289 |
req->job_id); |
2285 |
rc = ESLURM_DUPLICATE_JOB_ID; /* job already running */ |
2290 |
rc = ESLURM_DUPLICATE_JOB_ID; /* job already running */ |
Lines 2441-2447
static void _rpc_batch_job(slurm_msg_t *msg)
Link Here
|
2441 |
(hostset_t)NULL, SLURM_PROTOCOL_VERSION); |
2446 |
(hostset_t)NULL, SLURM_PROTOCOL_VERSION); |
2442 |
debug3("_rpc_batch_job: return from _forkexec_slurmstepd: %d", rc); |
2447 |
debug3("_rpc_batch_job: return from _forkexec_slurmstepd: %d", rc); |
2443 |
|
2448 |
|
2444 |
_launch_complete_add(req->job_id); |
2449 |
_launch_complete_add(req->job_id, true); |
2445 |
|
2450 |
|
2446 |
/* On a busy system, slurmstepd may take a while to respond, |
2451 |
/* On a busy system, slurmstepd may take a while to respond, |
2447 |
* if the job was cancelled in the interim, run through the |
2452 |
* if the job was cancelled in the interim, run through the |
Lines 4789-4795
extern void record_launched_jobs(void)
Link Here
|
4789 |
if (fd == -1) |
4794 |
if (fd == -1) |
4790 |
continue; /* step gone */ |
4795 |
continue; /* step gone */ |
4791 |
close(fd); |
4796 |
close(fd); |
4792 |
_launch_complete_add(stepd->step_id.job_id); |
4797 |
_launch_complete_add(stepd->step_id.job_id, |
|
|
4798 |
(stepd->step_id.step_id == |
4799 |
SLURM_BATCH_SCRIPT)); |
4793 |
} |
4800 |
} |
4794 |
list_iterator_destroy(i); |
4801 |
list_iterator_destroy(i); |
4795 |
FREE_NULL_LIST(steps); |
4802 |
FREE_NULL_LIST(steps); |
Lines 6003-6030
done:
Link Here
|
6003 |
slurm_send_rc_msg(msg, rc); |
6010 |
slurm_send_rc_msg(msg, rc); |
6004 |
} |
6011 |
} |
6005 |
|
6012 |
|
6006 |
static void _launch_complete_add(uint32_t job_id) |
6013 |
static void _launch_complete_add(uint32_t job_id, bool batch_step) |
6007 |
{ |
6014 |
{ |
6008 |
int j, empty; |
6015 |
int j, empty; |
6009 |
|
6016 |
|
6010 |
slurm_mutex_lock(&job_state_mutex); |
6017 |
slurm_mutex_lock(&job_state_mutex); |
6011 |
empty = -1; |
6018 |
empty = -1; |
6012 |
for (j = 0; j < JOB_STATE_CNT; j++) { |
6019 |
for (j = 0; j < JOB_STATE_CNT; j++) { |
6013 |
if (job_id == active_job_id[j]) |
6020 |
if (job_id == active_job_id[j].job_id) { |
|
|
6021 |
if (batch_step) |
6022 |
active_job_id[j].batch_step = batch_step; |
6014 |
break; |
6023 |
break; |
6015 |
if ((active_job_id[j] == 0) && (empty == -1)) |
6024 |
} |
|
|
6025 |
if ((active_job_id[j].job_id == 0) && (empty == -1)) |
6016 |
empty = j; |
6026 |
empty = j; |
6017 |
} |
6027 |
} |
6018 |
if (j >= JOB_STATE_CNT || job_id != active_job_id[j]) { |
6028 |
if (j >= JOB_STATE_CNT || job_id != active_job_id[j].job_id) { |
6019 |
if (empty == -1) /* Discard oldest job */ |
6029 |
if (empty == -1) /* Discard oldest job */ |
6020 |
empty = 0; |
6030 |
empty = 0; |
6021 |
for (j = empty + 1; j < JOB_STATE_CNT; j++) { |
6031 |
for (j = empty + 1; j < JOB_STATE_CNT; j++) { |
6022 |
active_job_id[j - 1] = active_job_id[j]; |
6032 |
active_job_id[j - 1] = active_job_id[j]; |
6023 |
} |
6033 |
} |
6024 |
active_job_id[JOB_STATE_CNT - 1] = 0; |
6034 |
active_job_id[JOB_STATE_CNT - 1].job_id = 0; |
|
|
6035 |
active_job_id[JOB_STATE_CNT - 1].batch_step = false; |
6025 |
for (j = 0; j < JOB_STATE_CNT; j++) { |
6036 |
for (j = 0; j < JOB_STATE_CNT; j++) { |
6026 |
if (active_job_id[j] == 0) { |
6037 |
if (active_job_id[j].job_id == 0) { |
6027 |
active_job_id[j] = job_id; |
6038 |
active_job_id[j].job_id = job_id; |
|
|
6039 |
active_job_id[j].batch_step = batch_step; |
6028 |
break; |
6040 |
break; |
6029 |
} |
6041 |
} |
6030 |
} |
6042 |
} |
Lines 6042-6049
static void _launch_complete_log(char *type, uint32_t job_id)
Link Here
|
6042 |
info("active %s %u", type, job_id); |
6054 |
info("active %s %u", type, job_id); |
6043 |
slurm_mutex_lock(&job_state_mutex); |
6055 |
slurm_mutex_lock(&job_state_mutex); |
6044 |
for (j = 0; j < JOB_STATE_CNT; j++) { |
6056 |
for (j = 0; j < JOB_STATE_CNT; j++) { |
6045 |
if (active_job_id[j] != 0) { |
6057 |
if (active_job_id[j].job_id != 0) { |
6046 |
info("active_job_id[%d]=%u", j, active_job_id[j]); |
6058 |
info("active_job_id[%d]=%u", j, |
|
|
6059 |
active_job_id[j].job_id); |
6047 |
} |
6060 |
} |
6048 |
} |
6061 |
} |
6049 |
slurm_mutex_unlock(&job_state_mutex); |
6062 |
slurm_mutex_unlock(&job_state_mutex); |
Lines 6051-6065
static void _launch_complete_log(char *type, uint32_t job_id)
Link Here
|
6051 |
} |
6064 |
} |
6052 |
|
6065 |
|
6053 |
/* Test if we have a specific job ID still running */ |
6066 |
/* Test if we have a specific job ID still running */ |
6054 |
static bool _launch_job_test(uint32_t job_id) |
6067 |
static bool _launch_job_test(uint32_t job_id, bool batch_step) |
6055 |
{ |
6068 |
{ |
6056 |
bool found = false; |
6069 |
bool found = false; |
6057 |
int j; |
6070 |
int j; |
6058 |
|
6071 |
|
6059 |
slurm_mutex_lock(&job_state_mutex); |
6072 |
slurm_mutex_lock(&job_state_mutex); |
6060 |
for (j = 0; j < JOB_STATE_CNT; j++) { |
6073 |
for (j = 0; j < JOB_STATE_CNT; j++) { |
6061 |
if (job_id == active_job_id[j]) { |
6074 |
if (job_id == active_job_id[j].job_id) { |
6062 |
found = true; |
6075 |
if (!batch_step || active_job_id[j].batch_step) |
|
|
6076 |
found = true; |
6063 |
break; |
6077 |
break; |
6064 |
} |
6078 |
} |
6065 |
} |
6079 |
} |
Lines 6074-6087
static void _launch_complete_rm(uint32_t job_id)
Link Here
|
6074 |
|
6088 |
|
6075 |
slurm_mutex_lock(&job_state_mutex); |
6089 |
slurm_mutex_lock(&job_state_mutex); |
6076 |
for (j = 0; j < JOB_STATE_CNT; j++) { |
6090 |
for (j = 0; j < JOB_STATE_CNT; j++) { |
6077 |
if (job_id == active_job_id[j]) |
6091 |
if (job_id == active_job_id[j].job_id) |
6078 |
break; |
6092 |
break; |
6079 |
} |
6093 |
} |
6080 |
if (j < JOB_STATE_CNT && job_id == active_job_id[j]) { |
6094 |
if (j < JOB_STATE_CNT && job_id == active_job_id[j].job_id) { |
6081 |
for (j = j + 1; j < JOB_STATE_CNT; j++) { |
6095 |
for (j = j + 1; j < JOB_STATE_CNT; j++) { |
6082 |
active_job_id[j - 1] = active_job_id[j]; |
6096 |
active_job_id[j - 1] = active_job_id[j]; |
6083 |
} |
6097 |
} |
6084 |
active_job_id[JOB_STATE_CNT - 1] = 0; |
6098 |
active_job_id[JOB_STATE_CNT - 1].job_id = 0; |
|
|
6099 |
active_job_id[JOB_STATE_CNT - 1].batch_step = false; |
6085 |
} |
6100 |
} |
6086 |
slurm_mutex_unlock(&job_state_mutex); |
6101 |
slurm_mutex_unlock(&job_state_mutex); |
6087 |
_launch_complete_log("job remove", job_id); |
6102 |
_launch_complete_log("job remove", job_id); |
Lines 6098-6106
static void _launch_complete_wait(uint32_t job_id)
Link Here
|
6098 |
for (i = 0; ; i++) { |
6113 |
for (i = 0; ; i++) { |
6099 |
empty = -1; |
6114 |
empty = -1; |
6100 |
for (j = 0; j < JOB_STATE_CNT; j++) { |
6115 |
for (j = 0; j < JOB_STATE_CNT; j++) { |
6101 |
if (job_id == active_job_id[j]) |
6116 |
if (job_id == active_job_id[j].job_id) |
6102 |
break; |
6117 |
break; |
6103 |
if ((active_job_id[j] == 0) && (empty == -1)) |
6118 |
if ((active_job_id[j].job_id == 0) && (empty == -1)) |
6104 |
empty = j; |
6119 |
empty = j; |
6105 |
} |
6120 |
} |
6106 |
if (j < JOB_STATE_CNT) /* Found job, ready to return */ |
6121 |
if (j < JOB_STATE_CNT) /* Found job, ready to return */ |
Lines 6120-6129
static void _launch_complete_wait(uint32_t job_id)
Link Here
|
6120 |
for (j = empty + 1; j < JOB_STATE_CNT; j++) { |
6135 |
for (j = empty + 1; j < JOB_STATE_CNT; j++) { |
6121 |
active_job_id[j - 1] = active_job_id[j]; |
6136 |
active_job_id[j - 1] = active_job_id[j]; |
6122 |
} |
6137 |
} |
6123 |
active_job_id[JOB_STATE_CNT - 1] = 0; |
6138 |
active_job_id[JOB_STATE_CNT - 1].job_id = 0; |
|
|
6139 |
active_job_id[JOB_STATE_CNT - 1].batch_step = false; |
6124 |
for (j = 0; j < JOB_STATE_CNT; j++) { |
6140 |
for (j = 0; j < JOB_STATE_CNT; j++) { |
6125 |
if (active_job_id[j] == 0) { |
6141 |
if (active_job_id[j].job_id == 0) { |
6126 |
active_job_id[j] = job_id; |
6142 |
active_job_id[j].job_id = job_id; |
6127 |
break; |
6143 |
break; |
6128 |
} |
6144 |
} |
6129 |
} |
6145 |
} |
6130 |
- |
|
|