View | Details | Raw Unified | Return to ticket 11635 | Differences between
and this patch

Collapse All | Expand All

(-)a/src/slurmd/slurmd/req.c (-28 / +43 lines)
Lines 141-146 typedef struct { Link Here
141
	pthread_mutex_t *timer_mutex;
141
	pthread_mutex_t *timer_mutex;
142
} timer_struct_t;
142
} timer_struct_t;
143
143
144
typedef struct {
145
	bool batch_step;
146
	uint32_t job_id;
147
} active_job_t;
148
144
static void _fb_rdlock(void);
149
static void _fb_rdlock(void);
145
static void _fb_rdunlock(void);
150
static void _fb_rdunlock(void);
146
static void _delay_rpc(int host_inx, int host_cnt, int usec_per_rpc);
151
static void _delay_rpc(int host_inx, int host_cnt, int usec_per_rpc);
Lines 150-161 static int _job_limits_match(void *x, void *key); Link Here
150
static bool _job_still_running(uint32_t job_id);
155
static bool _job_still_running(uint32_t job_id);
151
static int  _kill_all_active_steps(uint32_t jobid, int sig,
156
static int  _kill_all_active_steps(uint32_t jobid, int sig,
152
				   int flags, bool batch, uid_t req_uid);
157
				   int flags, bool batch, uid_t req_uid);
153
static void _launch_complete_add(uint32_t job_id);
158
static void _launch_complete_add(uint32_t job_id, bool btch_step);
154
static void _launch_complete_log(char *type, uint32_t job_id);
159
static void _launch_complete_log(char *type, uint32_t job_id);
155
static void _launch_complete_rm(uint32_t job_id);
160
static void _launch_complete_rm(uint32_t job_id);
156
static void _launch_complete_wait(uint32_t job_id);
161
static void _launch_complete_wait(uint32_t job_id);
157
static int  _launch_job_fail(uint32_t job_id, uint32_t slurm_rc);
162
static int  _launch_job_fail(uint32_t job_id, uint32_t slurm_rc);
158
static bool _launch_job_test(uint32_t job_id);
163
static bool _launch_job_test(uint32_t job_id, bool batch_step);
159
static void _note_batch_job_finished(uint32_t job_id);
164
static void _note_batch_job_finished(uint32_t job_id);
160
static int  _prolog_is_running (uint32_t jobid);
165
static int  _prolog_is_running (uint32_t jobid);
161
static int  _step_limits_match(void *x, void *key);
166
static int  _step_limits_match(void *x, void *key);
Lines 251-257 static int job_suspend_size = 0; Link Here
251
#define JOB_STATE_CNT 64
256
#define JOB_STATE_CNT 64
252
static pthread_mutex_t job_state_mutex   = PTHREAD_MUTEX_INITIALIZER;
257
static pthread_mutex_t job_state_mutex   = PTHREAD_MUTEX_INITIALIZER;
253
static pthread_cond_t  job_state_cond    = PTHREAD_COND_INITIALIZER;
258
static pthread_cond_t  job_state_cond    = PTHREAD_COND_INITIALIZER;
254
static uint32_t active_job_id[JOB_STATE_CNT] = {0};
259
static active_job_t active_job_id[JOB_STATE_CNT] = {0};
255
260
256
static pthread_mutex_t prolog_mutex = PTHREAD_MUTEX_INITIALIZER;
261
static pthread_mutex_t prolog_mutex = PTHREAD_MUTEX_INITIALIZER;
257
static pthread_mutex_t prolog_serial_mutex = PTHREAD_MUTEX_INITIALIZER;
262
static pthread_mutex_t prolog_serial_mutex = PTHREAD_MUTEX_INITIALIZER;
Lines 1610-1616 _rpc_launch_tasks(slurm_msg_t *msg) Link Here
1610
				      step_hset, msg->protocol_version);
1615
				      step_hset, msg->protocol_version);
1611
	debug3("%s: return from _forkexec_slurmstepd", __func__);
1616
	debug3("%s: return from _forkexec_slurmstepd", __func__);
1612
1617
1613
	_launch_complete_add(req->step_id.job_id);
1618
	_launch_complete_add(req->step_id.job_id, false);
1614
1619
1615
done:
1620
done:
1616
	if (step_hset)
1621
	if (step_hset)
Lines 2279-2285 static void _rpc_batch_job(slurm_msg_t *msg) Link Here
2279
		goto done;
2284
		goto done;
2280
	}
2285
	}
2281
2286
2282
	if (_launch_job_test(req->job_id)) {
2287
	if (_launch_job_test(req->job_id, true)) {
2283
		error("Job %u already running, do not launch second copy",
2288
		error("Job %u already running, do not launch second copy",
2284
		      req->job_id);
2289
		      req->job_id);
2285
		rc = ESLURM_DUPLICATE_JOB_ID;	/* job already running */
2290
		rc = ESLURM_DUPLICATE_JOB_ID;	/* job already running */
Lines 2441-2447 static void _rpc_batch_job(slurm_msg_t *msg) Link Here
2441
				  (hostset_t)NULL, SLURM_PROTOCOL_VERSION);
2446
				  (hostset_t)NULL, SLURM_PROTOCOL_VERSION);
2442
	debug3("_rpc_batch_job: return from _forkexec_slurmstepd: %d", rc);
2447
	debug3("_rpc_batch_job: return from _forkexec_slurmstepd: %d", rc);
2443
2448
2444
	_launch_complete_add(req->job_id);
2449
	_launch_complete_add(req->job_id, true);
2445
2450
2446
	/* On a busy system, slurmstepd may take a while to respond,
2451
	/* On a busy system, slurmstepd may take a while to respond,
2447
	 * if the job was cancelled in the interim, run through the
2452
	 * if the job was cancelled in the interim, run through the
Lines 4789-4795 extern void record_launched_jobs(void) Link Here
4789
		if (fd == -1)
4794
		if (fd == -1)
4790
			continue; /* step gone */
4795
			continue; /* step gone */
4791
		close(fd);
4796
		close(fd);
4792
		_launch_complete_add(stepd->step_id.job_id);
4797
		_launch_complete_add(stepd->step_id.job_id,
4798
				     (stepd->step_id.step_id ==
4799
				      SLURM_BATCH_SCRIPT));
4793
	}
4800
	}
4794
	list_iterator_destroy(i);
4801
	list_iterator_destroy(i);
4795
	FREE_NULL_LIST(steps);
4802
	FREE_NULL_LIST(steps);
Lines 6003-6030 done: Link Here
6003
	slurm_send_rc_msg(msg, rc);
6010
	slurm_send_rc_msg(msg, rc);
6004
}
6011
}
6005
6012
6006
static void _launch_complete_add(uint32_t job_id)
6013
static void _launch_complete_add(uint32_t job_id, bool batch_step)
6007
{
6014
{
6008
	int j, empty;
6015
	int j, empty;
6009
6016
6010
	slurm_mutex_lock(&job_state_mutex);
6017
	slurm_mutex_lock(&job_state_mutex);
6011
	empty = -1;
6018
	empty = -1;
6012
	for (j = 0; j < JOB_STATE_CNT; j++) {
6019
	for (j = 0; j < JOB_STATE_CNT; j++) {
6013
		if (job_id == active_job_id[j])
6020
		if (job_id == active_job_id[j].job_id) {
6021
			if (batch_step)
6022
				active_job_id[j].batch_step = batch_step;
6014
			break;
6023
			break;
6015
		if ((active_job_id[j] == 0) && (empty == -1))
6024
		}
6025
		if ((active_job_id[j].job_id == 0) && (empty == -1))
6016
			empty = j;
6026
			empty = j;
6017
	}
6027
	}
6018
	if (j >= JOB_STATE_CNT || job_id != active_job_id[j]) {
6028
	if (j >= JOB_STATE_CNT || job_id != active_job_id[j].job_id) {
6019
		if (empty == -1)	/* Discard oldest job */
6029
		if (empty == -1)	/* Discard oldest job */
6020
			empty = 0;
6030
			empty = 0;
6021
		for (j = empty + 1; j < JOB_STATE_CNT; j++) {
6031
		for (j = empty + 1; j < JOB_STATE_CNT; j++) {
6022
			active_job_id[j - 1] = active_job_id[j];
6032
			active_job_id[j - 1] = active_job_id[j];
6023
		}
6033
		}
6024
		active_job_id[JOB_STATE_CNT - 1] = 0;
6034
		active_job_id[JOB_STATE_CNT - 1].job_id = 0;
6035
		active_job_id[JOB_STATE_CNT - 1].batch_step = false;
6025
		for (j = 0; j < JOB_STATE_CNT; j++) {
6036
		for (j = 0; j < JOB_STATE_CNT; j++) {
6026
			if (active_job_id[j] == 0) {
6037
			if (active_job_id[j].job_id == 0) {
6027
				active_job_id[j] = job_id;
6038
				active_job_id[j].job_id = job_id;
6039
				active_job_id[j].batch_step = batch_step;
6028
				break;
6040
				break;
6029
			}
6041
			}
6030
		}
6042
		}
Lines 6042-6049 static void _launch_complete_log(char *type, uint32_t job_id) Link Here
6042
	info("active %s %u", type, job_id);
6054
	info("active %s %u", type, job_id);
6043
	slurm_mutex_lock(&job_state_mutex);
6055
	slurm_mutex_lock(&job_state_mutex);
6044
	for (j = 0; j < JOB_STATE_CNT; j++) {
6056
	for (j = 0; j < JOB_STATE_CNT; j++) {
6045
		if (active_job_id[j] != 0) {
6057
		if (active_job_id[j].job_id != 0) {
6046
			info("active_job_id[%d]=%u", j, active_job_id[j]);
6058
			info("active_job_id[%d]=%u", j,
6059
			     active_job_id[j].job_id);
6047
		}
6060
		}
6048
	}
6061
	}
6049
	slurm_mutex_unlock(&job_state_mutex);
6062
	slurm_mutex_unlock(&job_state_mutex);
Lines 6051-6065 static void _launch_complete_log(char *type, uint32_t job_id) Link Here
6051
}
6064
}
6052
6065
6053
/* Test if we have a specific job ID still running */
6066
/* Test if we have a specific job ID still running */
6054
static bool _launch_job_test(uint32_t job_id)
6067
static bool _launch_job_test(uint32_t job_id, bool batch_step)
6055
{
6068
{
6056
	bool found = false;
6069
	bool found = false;
6057
	int j;
6070
	int j;
6058
6071
6059
	slurm_mutex_lock(&job_state_mutex);
6072
	slurm_mutex_lock(&job_state_mutex);
6060
	for (j = 0; j < JOB_STATE_CNT; j++) {
6073
	for (j = 0; j < JOB_STATE_CNT; j++) {
6061
		if (job_id == active_job_id[j]) {
6074
		if (job_id == active_job_id[j].job_id) {
6062
			found = true;
6075
			if (!batch_step || active_job_id[j].batch_step)
6076
				found = true;
6063
			break;
6077
			break;
6064
		}
6078
		}
6065
	}
6079
	}
Lines 6074-6087 static void _launch_complete_rm(uint32_t job_id) Link Here
6074
6088
6075
	slurm_mutex_lock(&job_state_mutex);
6089
	slurm_mutex_lock(&job_state_mutex);
6076
	for (j = 0; j < JOB_STATE_CNT; j++) {
6090
	for (j = 0; j < JOB_STATE_CNT; j++) {
6077
		if (job_id == active_job_id[j])
6091
		if (job_id == active_job_id[j].job_id)
6078
			break;
6092
			break;
6079
	}
6093
	}
6080
	if (j < JOB_STATE_CNT && job_id == active_job_id[j]) {
6094
	if (j < JOB_STATE_CNT && job_id == active_job_id[j].job_id) {
6081
		for (j = j + 1; j < JOB_STATE_CNT; j++) {
6095
		for (j = j + 1; j < JOB_STATE_CNT; j++) {
6082
			active_job_id[j - 1] = active_job_id[j];
6096
			active_job_id[j - 1] = active_job_id[j];
6083
		}
6097
		}
6084
		active_job_id[JOB_STATE_CNT - 1] = 0;
6098
		active_job_id[JOB_STATE_CNT - 1].job_id = 0;
6099
		active_job_id[JOB_STATE_CNT - 1].batch_step = false;
6085
	}
6100
	}
6086
	slurm_mutex_unlock(&job_state_mutex);
6101
	slurm_mutex_unlock(&job_state_mutex);
6087
	_launch_complete_log("job remove", job_id);
6102
	_launch_complete_log("job remove", job_id);
Lines 6098-6106 static void _launch_complete_wait(uint32_t job_id) Link Here
6098
	for (i = 0; ; i++) {
6113
	for (i = 0; ; i++) {
6099
		empty = -1;
6114
		empty = -1;
6100
		for (j = 0; j < JOB_STATE_CNT; j++) {
6115
		for (j = 0; j < JOB_STATE_CNT; j++) {
6101
			if (job_id == active_job_id[j])
6116
			if (job_id == active_job_id[j].job_id)
6102
				break;
6117
				break;
6103
			if ((active_job_id[j] == 0) && (empty == -1))
6118
			if ((active_job_id[j].job_id == 0) && (empty == -1))
6104
				empty = j;
6119
				empty = j;
6105
		}
6120
		}
6106
		if (j < JOB_STATE_CNT)	/* Found job, ready to return */
6121
		if (j < JOB_STATE_CNT)	/* Found job, ready to return */
Lines 6120-6129 static void _launch_complete_wait(uint32_t job_id) Link Here
6120
		for (j = empty + 1; j < JOB_STATE_CNT; j++) {
6135
		for (j = empty + 1; j < JOB_STATE_CNT; j++) {
6121
			active_job_id[j - 1] = active_job_id[j];
6136
			active_job_id[j - 1] = active_job_id[j];
6122
		}
6137
		}
6123
		active_job_id[JOB_STATE_CNT - 1] = 0;
6138
		active_job_id[JOB_STATE_CNT - 1].job_id = 0;
6139
		active_job_id[JOB_STATE_CNT - 1].batch_step = false;
6124
		for (j = 0; j < JOB_STATE_CNT; j++) {
6140
		for (j = 0; j < JOB_STATE_CNT; j++) {
6125
			if (active_job_id[j] == 0) {
6141
			if (active_job_id[j].job_id == 0) {
6126
				active_job_id[j] = job_id;
6142
				active_job_id[j].job_id = job_id;
6127
				break;
6143
				break;
6128
			}
6144
			}
6129
		}
6145
		}
6130
- 

Return to ticket 11635