Sfoglia il codice sorgente

Update the publish API to return SA_AIS_ERR_TRY_AGAIN to the application
instead of handling it in the library.

(Logical change 1.145)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@516 fd59a12c-fef9-0310-b244-a6a79926bd2f

Mark Haverkamp 21 anni fa
parent
commit
622b861710
2 ha cambiato i file con 45 aggiunte e 54 eliminazioni
  1. 14 44
      lib/evt.c
  2. 31 10
      test/publish.c

+ 14 - 44
lib/evt.c

@@ -178,10 +178,6 @@ struct event_data_instance {
 };
 };
 
 
 
 
-#define PUB_SLEEP_NSEC 0L;
-#define PUB_SLEEP_SEC 1
-#define PUB_SLEEP_TRYS 20
-
 #define min(a,b) ((a) < (b) ? (a) : (b))
 #define min(a,b) ((a) < (b) ? (a) : (b))
 
 
 /*
 /*
@@ -1547,7 +1543,6 @@ saEvtEventPublish(
 	size_t pattern_size;
 	size_t pattern_size;
 	struct event_pattern *patterns;
 	struct event_pattern *patterns;
 	void   *data_start;
 	void   *data_start;
-	int pub_sleep_trys = PUB_SLEEP_TRYS;
 
 
 	if (eventDataSize > SA_EVT_DATA_MAX_LEN) {
 	if (eventDataSize > SA_EVT_DATA_MAX_LEN) {
 		error = SA_AIS_ERR_INVALID_PARAM;
 		error = SA_AIS_ERR_INVALID_PARAM;
@@ -1628,50 +1623,25 @@ saEvtEventPublish(
 	req->led_priority = edi->edi_priority;
 	req->led_priority = edi->edi_priority;
 	req->led_publisher_name = edi->edi_pub_name;
 	req->led_publisher_name = edi->edi_pub_name;
 
 
-	while (--pub_sleep_trys) {
-		pthread_mutex_lock(&evti->ei_mutex);
-		error = saSendRetry(evti->ei_fd, req, req->led_head.size, MSG_NOSIGNAL);
-		if (error != SA_AIS_OK) {
-			pthread_mutex_unlock (&evti->ei_mutex);
-			goto pub_put3_free;
-		}
-		error = saRecvQueue(evti->ei_fd, &res, &evti->ei_inq, 
-					MESSAGE_RES_EVT_PUBLISH);
-	
+	pthread_mutex_lock(&evti->ei_mutex);
+	error = saSendRetry(evti->ei_fd, req, req->led_head.size, MSG_NOSIGNAL);
+	free(req);
+	if (error != SA_AIS_OK) {
 		pthread_mutex_unlock (&evti->ei_mutex);
 		pthread_mutex_unlock (&evti->ei_mutex);
+		goto pub_put3;
+	}
 
 
-		if (error != SA_AIS_OK) {
-			goto pub_put3_free;
-		}
-
-		error = res.iep_head.error;
-
-		if (error == SA_AIS_ERR_TRY_AGAIN) {
-			struct timespec ts;
-			struct timespec rem;
-			ts.tv_sec = PUB_SLEEP_SEC;
-			ts.tv_nsec = PUB_SLEEP_NSEC;
-pub_sleep:
-			if (nanosleep(&ts, &rem) < 0) {
-				if (errno == EINTR) {
-					ts = rem;
-					goto pub_sleep;
-				}
-				error = SA_AIS_ERR_TIMEOUT;
-				goto pub_put3_free;
-			}
-			continue;
-		}
-
-		*eventId = res.iep_event_id;
-		break;
+	error = saRecvQueue(evti->ei_fd, &res, &evti->ei_inq, 
+					MESSAGE_RES_EVT_PUBLISH);
+	pthread_mutex_unlock (&evti->ei_mutex);
+	if (error != SA_AIS_OK) {
+		goto pub_put3;
 	}
 	}
-	if (error == SA_AIS_ERR_TRY_AGAIN) {
-		error = SA_AIS_ERR_TIMEOUT;
+	error = res.iep_head.error;
+	if (error == SA_AIS_OK) {
+		*eventId = res.iep_event_id;
 	}
 	}
 
 
-pub_put3_free:
-	free(req);
 pub_put3:
 pub_put3:
 	saHandleInstancePut (&evt_instance_handle_db, eci->eci_instance_handle);
 	saHandleInstancePut (&evt_instance_handle_db, eci->eci_instance_handle);
 pub_put2:
 pub_put2:

+ 31 - 10
test/publish.c

@@ -16,6 +16,8 @@
 
 
 // #define EVENT_SUBSCRIBE
 // #define EVENT_SUBSCRIBE
 
 
+#define PUB_RETRIES 100
+
 extern int get_sa_error(SaAisErrorT, char *, int);
 extern int get_sa_error(SaAisErrorT, char *, int);
 char result_buf[256];
 char result_buf[256];
 int result_buf_len = sizeof(result_buf);
 int result_buf_len = sizeof(result_buf);
@@ -124,6 +126,8 @@ test_pub()
 	uint64_t test_retention;
 	uint64_t test_retention;
 	int fd;
 	int fd;
 	int i;
 	int i;
+	int j;
+	int did_dot;
 
 
 	SaEvtEventIdT event_id;
 	SaEvtEventIdT event_id;
 #ifdef EVENT_SUBSCRIBE
 #ifdef EVENT_SUBSCRIBE
@@ -197,15 +201,29 @@ test_pub()
 	}
 	}
 
 
 	for (i = 0; i < pub_count; i++) {
 	for (i = 0; i < pub_count; i++) {
-	result = saEvtEventPublish(event_handle, user_data, 
+		did_dot = 0;
+		for (j = 0; j < PUB_RETRIES; j++) {
+			result = saEvtEventPublish(event_handle, user_data, 
 						user_data_size, &event_id);
 						user_data_size, &event_id);
-	if (result != SA_AIS_OK) {
-		get_sa_error(result, result_buf, result_buf_len);
-		printf("event Publish result(2): %s\n", result_buf);
-		exit(result);
-	}
+			if (result == SA_AIS_ERR_TRY_AGAIN) {
+				sleep(1);
+				fprintf(stderr, ".");
+				did_dot = 1;
+				continue;
+			}
+			if (result != SA_AIS_OK) {
+				get_sa_error(result, result_buf, result_buf_len);
+				printf("event Publish result(2): %s\n", result_buf);
+				exit(result);
+			}
+			if (did_dot) {
+				printf("\n");
+			}
+			break;
+		}
 
 
-	printf("Published event ID: 0x%llx\n", (unsigned long long)event_id);
+		printf("Published event ID: 0x%llx\n", 
+				(unsigned long long)event_id);
 	}
 	}
 
 
 	/*
 	/*
@@ -287,7 +305,8 @@ event_callback( SaEvtSubscriptionIdT subscription_id,
 
 
 	printf("event_callback called\n");
 	printf("event_callback called\n");
 	printf("sub ID: %x\n", subscription_id);
 	printf("sub ID: %x\n", subscription_id);
-	printf("event_handle %llx\n", (unsigned long long)event_handle);
+	printf("event_handle %llx\n", 
+			(unsigned long long)event_handle);
 	printf("event data size %d\n", event_data_size);
 	printf("event data size %d\n", event_data_size);
 
 
 	evt_pat_get_array.patternsNumber = 4;
 	evt_pat_get_array.patternsNumber = 4;
@@ -311,9 +330,11 @@ event_callback( SaEvtSubscriptionIdT subscription_id,
 	}
 	}
 
 
 	printf("priority: 0x%x\n", priority);
 	printf("priority: 0x%x\n", priority);
-	printf("retention: 0x%llx\n", (unsigned long long)retention_time);
+	printf("retention: 0x%llx\n", 
+			(unsigned long long)retention_time);
 	printf("publisher name content: \"%s\"\n", publisher_name.value); 
 	printf("publisher name content: \"%s\"\n", publisher_name.value); 
-	printf("event id: 0x%llx\n", (unsigned long long)event_id);
+	printf("event id: 0x%llx\n", 
+			(unsigned long long)event_id);
 evt_free:
 evt_free:
 	result = saEvtEventFree(event_handle);
 	result = saEvtEventFree(event_handle);
 	get_sa_error(result, result_buf, result_buf_len);
 	get_sa_error(result, result_buf, result_buf_len);