diff --git a/config.m4 b/config.m4 index 363a2d25f0..c95e5885ea 100644 --- a/config.m4 +++ b/config.m4 @@ -18,7 +18,20 @@ if test "$PHP_PHACTOR" != "no"; then src/classes/actor_system.c \ src/classes/actor.c \ src/classes/actor_ref.c \ - src/classes/supervisor.c, $ext_shared,, -DZEND_ENABLE_STATIC_TSRMLS_CACHE=1) + src/classes/supervisor.c \ + src/classes/non-blocking/file_handle.c, $ext_shared,, -DZEND_ENABLE_STATIC_TSRMLS_CACHE=1) + + PHP_ADD_INCLUDE(/usr/local/include) + PHP_CHECK_LIBRARY(uv, uv_version, [ + PHP_ADD_LIBRARY_WITH_PATH(uv, /usr/local/$PHP_LIBDIR, PHACTOR_SHARED_LIBADD) + AC_DEFINE(HAVE_UVLIB,1,[ ]) + ],[ + AC_MSG_ERROR([wrong uv library version or library not found]) + ],[ + -L/usr/local/$PHP_LIBDIR -lm + ]) + PHP_SUBST([CFLAGS]) + PHP_SUBST(PHACTOR_SHARED_LIBADD) EXTRA_CFLAGS="$EXTRA_CFLAGS -std=gnu99" PHP_SUBST(EXTRA_CFLAGS) diff --git a/phactor.c b/phactor.c index a8cd0faec5..7bc6a14b2e 100644 --- a/phactor.c +++ b/phactor.c @@ -29,6 +29,7 @@ #include "src/classes/actor.h" #include "src/classes/actor_ref.h" #include "src/classes/supervisor.h" +#include "src/classes/non-blocking/file_handle.h" #ifndef ZTS # error "Zend Thread Safety (ZTS) mode is required" @@ -55,6 +56,7 @@ PHP_MINIT_FUNCTION(phactor) ph_actor_ce_init(); ph_actor_ref_ce_init(); ph_supervisor_ce_init(); + ph_file_handle_ce_init(); pthread_mutex_init(&global_actor_id_lock, NULL); pthread_mutex_init(&global_tree_number_lock, NULL); diff --git a/php_phactor.h b/php_phactor.h index d10df8a6a1..35634d3703 100644 --- a/php_phactor.h +++ b/php_phactor.h @@ -35,11 +35,16 @@ extern zend_module_entry phactor_module_entry; #define PHACTOR_CG(ls, v) PHACTOR_CTX(ls, compiler_globals_id, zend_compiler_globals*, v) #define PHACTOR_SG(ls, v) PHACTOR_CTX(ls, sapi_globals_id, sapi_globals_struct*, v) +struct _ph_actor_t; +struct _ph_thread_t; + ZEND_EXTERN_MODULE_GLOBALS(phactor) ZEND_BEGIN_MODULE_GLOBALS(phactor) HashTable op_array_file_names; int allowed_to_construct_object; + struct _ph_actor_t *currently_executing_actor; + struct _ph_thread_t *ph_thread; ZEND_END_MODULE_GLOBALS(phactor) typedef struct _common_strings_t { diff --git a/src/classes/actor.c b/src/classes/actor.c index e26ba91f6b..80b547b447 100644 --- a/src/classes/actor.c +++ b/src/classes/actor.c @@ -26,8 +26,6 @@ #include "src/classes/actor_system.h" extern ph_actor_system_t *actor_system; -extern __thread ph_actor_t *currently_processing_actor; -extern __thread int thread_offset; extern zend_class_entry *ph_ActorRef_ce; zend_object_handlers ph_Actor_handlers; @@ -75,7 +73,7 @@ void ph_actor_remove_from_table(void *actor_void) void ph_actor_mark_for_removal(void *actor_void) { ph_actor_t *actor = actor_void; - ph_vector_t *actor_removals = PHACTOR_G(actor_system)->actor_removals + actor->thread_offset; + ph_vector_t *actor_removals = &actor->ph_thread->actor_removals; pthread_mutex_lock(&actor->lock); actor->state = PH_ACTOR_SHUTTING_DOWN; @@ -201,6 +199,7 @@ ph_actor_t *ph_actor_create(ph_string_t *name, ph_string_t *ref, ph_string_t *cl new_actor->ctor_args = ctor_args; new_actor->ctor_argc = ctor_argc; new_actor->restart_count_streak = 0; + new_actor->restart_count = 0; new_actor->tree_number = -1; ph_queue_init(&new_actor->mailbox, ph_msg_free); @@ -228,21 +227,20 @@ static void receive_block(ph_actor_t *actor, zval *return_value) // @todo possible optimisation: if task queue is empty, just skip the next 7 lines if (ph_queue_size(&actor->mailbox)) { - ph_thread_t *thread = PHACTOR_G(actor_system)->worker_threads + actor->thread_offset; ph_task_t *task = ph_task_create_resume_actor(actor); - pthread_mutex_lock(&thread->tasks.lock); - ph_queue_push(&thread->tasks, task); - pthread_mutex_unlock(&thread->tasks.lock); + pthread_mutex_lock(&actor->ph_thread->tasks.lock); + ph_queue_push(&actor->ph_thread->tasks, task); + pthread_mutex_unlock(&actor->ph_thread->tasks.lock); } pthread_mutex_unlock(&actor->lock); - ph_vmcontext_swap(&actor->internal->context.vmc, &PHACTOR_G(actor_system)->worker_threads[thread_offset].context.vmc); + ph_vmcontext_swap(&actor->internal->context.vmc, &PHACTOR_ZG(ph_thread)->context.vmc); #ifdef PH_FIXED_STACK_SIZE - ph_mcontext_swap(&actor->internal->context.mc, &PHACTOR_G(actor_system)->worker_threads[thread_offset].context.mc); + ph_mcontext_swap(&actor->internal->context.mc, &PHACTOR_ZG(ph_thread)->context.mc); #else - ph_mcontext_interrupt(&actor->internal->context.mc, &PHACTOR_G(actor_system)->worker_threads[thread_offset].context.mc); + ph_mcontext_interrupt(&actor->internal->context.mc, &PHACTOR_ZG(ph_thread)->context.mc); #endif pthread_mutex_lock(&actor->lock); @@ -258,7 +256,7 @@ static zend_execute_data dummy_execute_data; void process_message_handler(void) { - ph_actor_t *actor = currently_processing_actor; + ph_actor_t *actor = PHACTOR_ZG(currently_executing_actor); zend_object *object = &actor->internal->obj; // from tls zend_function *receive_function; zend_fcall_info fci; @@ -283,7 +281,7 @@ void process_message_handler(void) EG(current_execute_data) = NULL; - ph_vmcontext_swap(&actor->internal->context.vmc, &PHACTOR_G(actor_system)->worker_threads[thread_offset].context.vmc); + ph_vmcontext_swap(&actor->internal->context.vmc, &PHACTOR_ZG(ph_thread)->context.vmc); if (result == FAILURE && !EG(exception)) { zend_error_noreturn(E_CORE_ERROR, "Couldn't execute method %s%s%s", ZSTR_VAL(object->ce->name), "::", "receive"); @@ -305,7 +303,7 @@ void process_message_handler(void) } #ifdef PH_FIXED_STACK_SIZE - ph_mcontext_set(&PHACTOR_G(actor_system)->worker_threads[thread_offset].context.mc); + ph_mcontext_set(&PHACTOR_ZG(ph_thread)->context.mc); #endif } diff --git a/src/classes/actor.h b/src/classes/actor.h index 1acf24cbd7..fd6fb12eb8 100644 --- a/src/classes/actor.h +++ b/src/classes/actor.h @@ -26,6 +26,8 @@ #include "src/ds/ph_hashtable.h" #include "src/classes/supervisor.h" +struct _ph_thread_t; + typedef enum _ph_actor_state_t { PH_ACTOR_SPAWNING, // prevents invoking receiveBlock in the constructor PH_ACTOR_IDLE, // waiting for something - needs context restoring @@ -33,6 +35,7 @@ typedef enum _ph_actor_state_t { PH_ACTOR_TERMINATED, // actor terminated by supervisor - await for further action PH_ACTOR_CRASHED, // await for further action (restart or free) PH_ACTOR_RESTARTING, // actor is going through the restart cycle + PH_ACTOR_BLOCKING, // prevents an actor from being rescheduled via a new message PH_ACTOR_SHUTTING_DOWN // actor will be destroyed soon } ph_actor_state_t; @@ -49,9 +52,10 @@ typedef struct _ph_actor_t { ph_string_t class_name; ph_entry_t *ctor_args; int ctor_argc; - int thread_offset; + struct _ph_thread_t *ph_thread; ph_actor_state_t state; int restart_count_streak; + int restart_count; int tree_number; struct _ph_actor_t *supervisor; ph_supervision_t *supervision; diff --git a/src/classes/actor_ref.c b/src/classes/actor_ref.c index b0a50b701f..ef0b3c1d39 100644 --- a/src/classes/actor_ref.c +++ b/src/classes/actor_ref.c @@ -162,10 +162,9 @@ void ph_actor_ref_create(zval *zobj, zend_string *actor_class, zval *ctor_args, ph_hashtable_insert(&PHACTOR_G(actor_system)->actors_by_ref, new_actor_ref, new_actor); pthread_mutex_unlock(&PHACTOR_G(actor_system)->actors_by_ref.lock); - int thread_offset = php_mt_rand_range(0, PHACTOR_G(actor_system)->thread_count - 1); - ph_thread_t *thread = PHACTOR_G(actor_system)->worker_threads + thread_offset; + ph_thread_t *thread = PHACTOR_G(actor_system)->worker_threads + php_mt_rand_range(0, PHACTOR_G(actor_system)->thread_count - 1); - new_actor->thread_offset = thread_offset; + new_actor->ph_thread = thread; pthread_mutex_lock(&thread->tasks.lock); ph_queue_push(&thread->tasks, task); diff --git a/src/classes/actor_system.c b/src/classes/actor_system.c index 288776033d..c003fd4520 100644 --- a/src/classes/actor_system.c +++ b/src/classes/actor_system.c @@ -34,9 +34,6 @@ #include "src/classes/actor_system.h" ph_actor_system_t *actor_system; -__thread ph_actor_t *currently_processing_actor; -__thread ph_thread_t *thread; -__thread int thread_offset; ph_thread_t main_thread; zend_object_handlers ph_ActorSystem_handlers; @@ -50,11 +47,9 @@ void send_local_message(ph_actor_t *to_actor, ph_task_t *task) ph_queue_push(&to_actor->mailbox, message); if (to_actor->state == PH_ACTOR_IDLE && ph_queue_size(&to_actor->mailbox) == 1) { - ph_thread_t *thread = PHACTOR_G(actor_system)->worker_threads + to_actor->thread_offset; - - pthread_mutex_lock(&thread->tasks.lock); - ph_queue_push(&thread->tasks, ph_task_create_resume_actor(to_actor)); - pthread_mutex_unlock(&thread->tasks.lock); + pthread_mutex_lock(&to_actor->ph_thread->tasks.lock); + ph_queue_push(&to_actor->ph_thread->tasks, ph_task_create_resume_actor(to_actor)); + pthread_mutex_unlock(&to_actor->ph_thread->tasks.lock); } pthread_mutex_unlock(&to_actor->lock); } @@ -86,9 +81,9 @@ void process_message(ph_actor_t *for_actor) ph_vmcontext_set(&for_actor->internal->context.vmc); // swap into process_message_handler #ifdef PH_FIXED_STACK_SIZE - ph_mcontext_swap(&PHACTOR_G(actor_system)->worker_threads[thread_offset].context.mc, &for_actor->internal->context.mc); + ph_mcontext_swap(&PHACTOR_ZG(ph_thread)->context.mc, &for_actor->internal->context.mc); #else - ph_mcontext_start(&PHACTOR_G(actor_system)->worker_threads[thread_offset].context.mc, for_actor->internal->context.mc.cb); + ph_mcontext_start(&PHACTOR_ZG(ph_thread)->context.mc, for_actor->internal->context.mc.cb); #endif } @@ -97,9 +92,9 @@ void resume_actor(ph_actor_t *actor) ph_vmcontext_set(&actor->internal->context.vmc); // swap back into receive_block #ifdef PH_FIXED_STACK_SIZE - ph_mcontext_swap(&PHACTOR_G(actor_system)->worker_threads[thread_offset].context.mc, &actor->internal->context.mc); + ph_mcontext_swap(&PHACTOR_ZG(ph_thread)->context.mc, &actor->internal->context.mc); #else - ph_mcontext_resume(&PHACTOR_G(actor_system)->worker_threads[thread_offset].context.mc, &actor->internal->context.mc); + ph_mcontext_resume(&PHACTOR_ZG(ph_thread)->context.mc, &actor->internal->context.mc); #endif } @@ -203,7 +198,7 @@ ph_actor_t *new_actor(ph_task_t *task) void perform_actor_removals(void) { - ph_vector_t *actor_removals = PHACTOR_G(actor_system)->actor_removals + thread_offset; + ph_vector_t *actor_removals = &PHACTOR_ZG(ph_thread)->actor_removals; pthread_mutex_lock(&actor_removals->lock); @@ -218,6 +213,7 @@ void message_handling_loop(ph_thread_t *ph_thread) { while (1) { perform_actor_removals(); + uv_run(&ph_thread->event_loop, UV_RUN_ONCE); pthread_mutex_lock(&ph_thread->tasks.lock); ph_task_t *current_task = ph_queue_pop(&ph_thread->tasks); @@ -243,17 +239,20 @@ void message_handling_loop(ph_thread_t *ph_thread) ph_actor_t *actor = ph_hashtable_search(&PHACTOR_G(actor_system)->actors_by_ref, ¤t_task->u.rat.actor_ref); pthread_mutex_unlock(&PHACTOR_G(actor_system)->actors_by_ref.lock); - assert(actor && actor->internal); // may change in future + if (actor) { + assert(actor->internal); // may change in future - ph_str_value_free(¤t_task->u.rat.actor_ref); + ph_str_value_free(¤t_task->u.rat.actor_ref); + PHACTOR_ZG(currently_executing_actor) = actor; - resume_actor(actor); + resume_actor(actor); + } break; case PH_NEW_ACTOR_TASK: - currently_processing_actor = new_actor(current_task); + PHACTOR_ZG(currently_executing_actor) = new_actor(current_task); - if (currently_processing_actor) { - process_message(currently_processing_actor); + if (PHACTOR_ZG(currently_executing_actor)) { + process_message(PHACTOR_ZG(currently_executing_actor)); } break; case PH_FREE_VM_STACK_TASK: @@ -266,8 +265,6 @@ void message_handling_loop(ph_thread_t *ph_thread) void *worker_function(ph_thread_t *ph_thread) { - thread_offset = ph_thread->offset; - thread = ph_thread; ph_thread->id = (ulong) pthread_self(); ph_thread->ls = ts_resource(0); @@ -287,9 +284,14 @@ void *worker_function(ph_thread_t *ph_thread) pthread_mutex_unlock(&PHACTOR_G(actor_system)->lock); ph_vmcontext_get(&ph_thread->context.vmc); + PHACTOR_ZG(ph_thread) = ph_thread; + + uv_loop_init(&ph_thread->event_loop); message_handling_loop(ph_thread); + uv_loop_close(&ph_thread->event_loop); + PG(report_memleaks) = 0; pthread_mutex_lock(&PHACTOR_G(actor_system)->lock); @@ -330,9 +332,9 @@ void initialise_actor_system(zend_long thread_count) PHACTOR_G(actor_system)->thread_count = thread_count; PHACTOR_G(main_thread).id = (ulong) pthread_self(); PHACTOR_G(main_thread).ls = TSRMLS_CACHE; - PHACTOR_G(actor_system)->actor_removals = calloc(sizeof(ph_vector_t), PHACTOR_G(actor_system)->thread_count + 1); PHACTOR_G(actor_system)->worker_threads = calloc(sizeof(ph_thread_t), PHACTOR_G(actor_system)->thread_count + 1); pthread_mutex_init(&PHACTOR_G(actor_system)->lock, NULL); + PHACTOR_ZG(ph_thread) = PHACTOR_G(actor_system)->worker_threads + PHACTOR_G(actor_system)->thread_count; for (int i = 0; i <= PHACTOR_G(actor_system)->thread_count; ++i) { ph_thread_t *thread = PHACTOR_G(actor_system)->worker_threads + i; @@ -341,14 +343,11 @@ void initialise_actor_system(zend_long thread_count) ph_queue_init(&thread->tasks, ph_task_free); if (i != PHACTOR_G(actor_system)->thread_count) { - ph_vector_init(PHACTOR_G(actor_system)->actor_removals + i, 4, ph_actor_free); + ph_vector_init(&thread->actor_removals, 4, ph_actor_free); pthread_create(&thread->pthread, NULL, (void *) worker_function, thread); } } - thread_offset = PHACTOR_G(actor_system)->thread_count; - thread = PHACTOR_G(actor_system)->worker_threads + PHACTOR_G(actor_system)->thread_count; - while (PHACTOR_G(actor_system)->thread_count != PHACTOR_G(actor_system)->prepared_thread_count); // automatically invoke ActorSystem::block(), using [&PHACTOR_G(actor_system)->obj, "block"] callable @@ -449,13 +448,12 @@ void php_actor_system_dtor_object(zend_object *obj) void php_actor_system_free_object(zend_object *obj) { for (int i = 0; i < PHACTOR_G(actor_system)->thread_count; ++i) { - ph_vector_destroy(PHACTOR_G(actor_system)->actor_removals + i); + ph_vector_destroy(&(PHACTOR_G(actor_system)->worker_threads + i)->actor_removals); } pthread_mutex_destroy(&PHACTOR_G(actor_system)->lock); free(PHACTOR_G(actor_system)->worker_threads); - free(PHACTOR_G(actor_system)->actor_removals); } zval *ph_actor_system_read_property(zval *object, zval *member, int type, void **cache, zval *rv) diff --git a/src/classes/actor_system.h b/src/classes/actor_system.h index b73ee41892..e889087d80 100644 --- a/src/classes/actor_system.h +++ b/src/classes/actor_system.h @@ -23,6 +23,8 @@ #include
+#include + #include "src/ph_context.h" #include "src/ds/ph_hashtable.h" #include "src/ds/ph_vector.h" @@ -38,6 +40,8 @@ typedef struct _ph_thread_t { int offset; void*** ls; // pointer to local storage in TSRM ph_context_t context; + ph_vector_t actor_removals; + uv_loop_t event_loop; } ph_thread_t; typedef struct _ph_actor_system_t { @@ -50,7 +54,6 @@ typedef struct _ph_actor_system_t { int prepared_thread_count; int finished_thread_count; ph_thread_t *worker_threads; - ph_vector_t *actor_removals; // @todo why not put this in ph_thread_t instead? pthread_mutex_t lock; zend_object obj; } ph_actor_system_t; diff --git a/src/classes/non-blocking/file_handle.c b/src/classes/non-blocking/file_handle.c new file mode 100644 index 0000000000..555f88a0d2 --- /dev/null +++ b/src/classes/non-blocking/file_handle.c @@ -0,0 +1,272 @@ +/* + +----------------------------------------------------------------------+ + | PHP Version 7 | + +----------------------------------------------------------------------+ + | Copyright (c) 1997-present The PHP Group | + +----------------------------------------------------------------------+ + | This source file is subject to version 3.01 of the PHP license, | + | that is bundled with this package in the file LICENSE, and is | + | available through the world-wide-web at the following url: | + | http://www.php.net/license/3_01.txt | + | If you did not receive a copy of the PHP license and are unable to | + | obtain it through the world-wide-web, please send a note to | + | license@php.net so we can mail you a copy immediately. | + +----------------------------------------------------------------------+ + | Author: Thomas Punt | + +----------------------------------------------------------------------+ +*/ + +#include
+#include + +#include "php_phactor.h" +#include "src/ph_task.h" +#include "src/ph_message.h" +#include "src/classes/actor_system.h" +#include "src/classes/non-blocking/file_handle.h" + +extern ph_actor_system_t *actor_system; + +zend_object_handlers ph_FileHandle_handlers; +zend_class_entry *ph_FileHandle_ce; + +ph_file_handle_t *ph_file_handle_retrieve_from_object(zend_object *file_handle_obj) +{ + return (ph_file_handle_t *)((char *)file_handle_obj - file_handle_obj->handlers->offset); +} + +void ph_fetch_and_reschedule_actor(ph_file_handle_t *fh) +{ + pthread_mutex_lock(&PHACTOR_G(actor_system)->actors_by_ref.lock); + ph_actor_t *actor = ph_hashtable_search(&PHACTOR_G(actor_system)->actors_by_ref, &fh->actor_ref); + pthread_mutex_unlock(&PHACTOR_G(actor_system)->actors_by_ref.lock); + + // There's no race condition here, since only the thread that created the + // actor can free it + if (actor && actor->state == PH_ACTOR_BLOCKING && fh->actor_restart_count == actor->restart_count) { + pthread_mutex_lock(&PHACTOR_ZG(ph_thread)->tasks.lock); + ph_queue_push(&PHACTOR_ZG(ph_thread)->tasks, ph_task_create_resume_actor(actor)); + pthread_mutex_unlock(&PHACTOR_ZG(ph_thread)->tasks.lock); + } +} + +void ph_file_open(uv_fs_t* req) +{ + ph_file_handle_t *fh = (ph_file_handle_t *)req; + + if (req->result < 0) { + switch (req->result) { + case UV_ENOENT: + zend_throw_exception_ex(NULL, 0, "Cannot open file because it does not exist"); + break; + default: + zend_throw_exception_ex(NULL, 0, "Cannot open file because UNKNOWN (%d)", req->result); + } + } else { + fh->fd = req->result; + } + + ph_fetch_and_reschedule_actor(fh); +} + +void ph_file_stat(uv_fs_t* req) +{ + ph_file_handle_t *fh = (ph_file_handle_t *)req; + + if (req->result < 0) { + zend_throw_exception_ex(NULL, 0, "An error occurred when reading the file's stats (%d)", req->result); + } else { + fh->file_size = req->statbuf.st_size; + } + + uv_fs_req_cleanup(req); + ph_fetch_and_reschedule_actor(fh); +} + +void ph_file_read(uv_fs_t* req) +{ + ph_file_handle_t *fh = (ph_file_handle_t *)req; + + if (req->result < 0 && req->result != UV_EOF) { + zend_throw_exception_ex(NULL, 0, "Could not read file (%d)", req->result); + } + + uv_fs_req_cleanup(req); + ph_fetch_and_reschedule_actor(fh); +} + +void ph_file_write(uv_fs_t* req) +{ + if (req->result < 0) { + zend_throw_exception_ex(NULL, 0, "Could not write to file (%d)", req->result); + } + + uv_fs_req_cleanup(req); + ph_fetch_and_reschedule_actor((ph_file_handle_t *)req); +} + +void ph_blocking_context_switch(ph_actor_t *actor) +{ + pthread_mutex_lock(&actor->lock); + + if (actor->state == PH_ACTOR_ACTIVE) { + actor->state = PH_ACTOR_BLOCKING; + + pthread_mutex_unlock(&actor->lock); + + ph_vmcontext_swap(&actor->internal->context.vmc, &PHACTOR_ZG(ph_thread)->context.vmc); + +#ifdef PH_FIXED_STACK_SIZE + ph_mcontext_swap(&actor->internal->context.mc, &PHACTOR_ZG(ph_thread)->context.mc); +#else + ph_mcontext_interrupt(&actor->internal->context.mc, &PHACTOR_ZG(ph_thread)->context.mc); +#endif + + pthread_mutex_lock(&actor->lock); + + actor->state = PH_ACTOR_ACTIVE; + } + + pthread_mutex_unlock(&actor->lock); +} + +ZEND_BEGIN_ARG_INFO_EX(FileHandle___construct_arginfo, 0, 0, 1) + ZEND_ARG_INFO(0, fileName) +ZEND_END_ARG_INFO() + +PHP_METHOD(FileHandle, __construct) +{ + ph_actor_t *actor = PHACTOR_ZG(currently_executing_actor); + char *filename; + size_t length; + + ZEND_PARSE_PARAMETERS_START(1, 1) + Z_PARAM_STRING(filename, length) + ZEND_PARSE_PARAMETERS_END(); + + ph_file_handle_t *fh = ph_file_handle_retrieve_from_object(Z_OBJ_P(getThis())); + + // @todo check for NUL byte character (file name is not NUL safe) + + fh->name = filename; + fh->actor_restart_count = actor->restart_count; + + if (!uv_fs_open(&PHACTOR_ZG(ph_thread)->event_loop, (uv_fs_t *)fh, fh->name, O_ASYNC, 0, ph_file_open)) { + ph_blocking_context_switch(actor); + } else { + zend_throw_exception(NULL, "Failed to begin opening the file", 0); + } +} + +ZEND_BEGIN_ARG_INFO_EX(FileHandle_read_arginfo, 0, 0, 0) +ZEND_END_ARG_INFO() + +PHP_METHOD(FileHandle, read) +{ + if (zend_parse_parameters_none() != SUCCESS) { + return; + } + + ph_file_handle_t *fh = ph_file_handle_retrieve_from_object(Z_OBJ_P(getThis())); + ph_actor_t *actor = PHACTOR_ZG(currently_executing_actor); + + fh->actor_restart_count = actor->restart_count; + + uv_fs_stat(&PHACTOR_ZG(ph_thread)->event_loop, (uv_fs_t *)fh, fh->name, ph_file_stat); + + ph_blocking_context_switch(actor); + + if (EG(exception)) { + return; + } + + if (!fh->file_size) { + RETURN_EMPTY_STRING(); + } + + uv_buf_t buffer[1]; + + fh->buffer_size = fh->file_size; + fh->buffer = emalloc(fh->buffer_size); + buffer[0] = uv_buf_init(fh->buffer, fh->buffer_size); + + if (!uv_fs_read(&PHACTOR_ZG(ph_thread)->event_loop, (uv_fs_t *)fh, (uv_file)fh->fd, buffer, 1, 0, ph_file_read)) { + ph_blocking_context_switch(actor); + + if (!EG(exception)) { + RETVAL_NEW_STR(zend_string_init(fh->buffer, fh->buffer_size, 0)); + } + } else { + zend_throw_exception(NULL, "Failed to begin reading the file", 0); + } + + efree(fh->buffer); + fh->buffer_size = 0; + fh->buffer = NULL; +} + +ZEND_BEGIN_ARG_INFO_EX(FileHandle_write_arginfo, 0, 0, 1) + ZEND_ARG_INFO(0, content) +ZEND_END_ARG_INFO() + +PHP_METHOD(FileHandle, write) +{ + char *content; + size_t length; + + ZEND_PARSE_PARAMETERS_START(1, 1) + Z_PARAM_STRING(content, length) + ZEND_PARSE_PARAMETERS_END(); + + ph_file_handle_t *fh = ph_file_handle_retrieve_from_object(Z_OBJ_P(getThis())); + ph_actor_t *actor = PHACTOR_ZG(currently_executing_actor); + uv_buf_t buffer[1]; + + fh->actor_restart_count = actor->restart_count; + buffer[0] = uv_buf_init(content, length); + + if (!uv_fs_write(&PHACTOR_ZG(ph_thread)->event_loop, (uv_fs_t *)fh, fh->fd, buffer, 1, -1, ph_file_write)) { + ph_blocking_context_switch(actor); + } else { + zend_throw_exception(NULL, "Failed to begin writing to the file", 0); + } +} + +zend_object* ph_file_handle_ctor(zend_class_entry *entry) +{ + ph_file_handle_t *fh = ecalloc(1, sizeof(ph_file_handle_t) + zend_object_properties_size(entry)); + + zend_object_std_init(&fh->obj, entry); + object_properties_init(&fh->obj, entry); + + fh->obj.handlers = &ph_FileHandle_handlers; + + ph_str_copy(&fh->actor_ref, PHACTOR_ZG(currently_executing_actor)->ref); + fh->file_size = -1; + + return &fh->obj; +} + +zend_function_entry FileHandle_methods[] = { + PHP_ME(FileHandle, __construct, FileHandle___construct_arginfo, ZEND_ACC_PUBLIC) + PHP_ME(FileHandle, read, FileHandle_read_arginfo, ZEND_ACC_PUBLIC) + PHP_ME(FileHandle, write, FileHandle_write_arginfo, ZEND_ACC_PUBLIC) + PHP_FE_END +}; + +void ph_file_handle_ce_init(void) +{ + zend_class_entry ce; + zend_object_handlers *zh = zend_get_std_object_handlers(); + + INIT_CLASS_ENTRY(ce, "phactor\\FileHandle", FileHandle_methods); + ph_FileHandle_ce = zend_register_internal_class(&ce); + ph_FileHandle_ce->create_object = ph_file_handle_ctor; + ph_FileHandle_ce->ce_flags |= ZEND_ACC_FINAL; + + memcpy(&ph_FileHandle_handlers, zh, sizeof(zend_object_handlers)); + + ph_FileHandle_handlers.offset = XtOffsetOf(ph_file_handle_t, obj); + // ph_FileHandle_handlers.read_property = ph_actor_ref_read_property; + // ph_FileHandle_handlers.write_property = ph_actor_ref_write_property; +} diff --git a/src/classes/non-blocking/file_handle.h b/src/classes/non-blocking/file_handle.h new file mode 100644 index 0000000000..d146edf4d6 --- /dev/null +++ b/src/classes/non-blocking/file_handle.h @@ -0,0 +1,40 @@ +/* + +----------------------------------------------------------------------+ + | PHP Version 7 | + +----------------------------------------------------------------------+ + | Copyright (c) 1997-present The PHP Group | + +----------------------------------------------------------------------+ + | This source file is subject to version 3.01 of the PHP license, | + | that is bundled with this package in the file LICENSE, and is | + | available through the world-wide-web at the following url: | + | http://www.php.net/license/3_01.txt | + | If you did not receive a copy of the PHP license and are unable to | + | obtain it through the world-wide-web, please send a note to | + | license@php.net so we can mail you a copy immediately. | + +----------------------------------------------------------------------+ + | Author: Thomas Punt | + +----------------------------------------------------------------------+ +*/ + +#ifndef PH_FILE_HANDLE_H +#define PH_FILE_HANDLE_H + +#include + +#include "src/ph_string.h" + +typedef struct _ph_file_handle_t { + uv_fs_t fs; + char *name; + char *buffer; + uv_file fd; + zend_long buffer_size; + zend_long file_size; + ph_string_t actor_ref; + int actor_restart_count; // maintains actor version number + zend_object obj; +} ph_file_handle_t; + +void ph_file_handle_ce_init(void); + +#endif diff --git a/src/classes/supervisor.c b/src/classes/supervisor.c index 0c7fa81dfd..5b4813e1a6 100644 --- a/src/classes/supervisor.c +++ b/src/classes/supervisor.c @@ -45,7 +45,7 @@ ph_supervisor_t *ph_supervisor_fetch_from_object(zend_object *supervisor_obj) void ph_supervisor_one_for_one(void *crashed_actor_void) { ph_actor_t *crashed_actor = crashed_actor_void; - ph_thread_t *thread = PHACTOR_G(actor_system)->worker_threads + crashed_actor->thread_offset; + ph_thread_t *thread = crashed_actor->ph_thread; // @note for now, reschedule the actor on the same thread. This prevents the // need to free the VM stack completely, as well as ensuring static members @@ -84,6 +84,8 @@ void ph_supervisor_one_for_one(void *crashed_actor_void) void ph_supervisor_handle_crash(ph_actor_t *supervisor, ph_actor_t *crashed_actor) { + ++crashed_actor->restart_count; + if (++crashed_actor->restart_count_streak == supervisor->supervision->restart_count_streak_max) { // @todo how should we react? Log it? Crash the supervisor? // default to crashing the supervisor diff --git a/src/ph_string.c b/src/ph_string.c index 6f48e2044e..1186aace8f 100644 --- a/src/ph_string.c +++ b/src/ph_string.c @@ -52,6 +52,13 @@ void ph_str_set(ph_string_t *phstr, char *s, int len) memcpy(PH_STRV_P(phstr), s, len); } +void ph_str_copy(ph_string_t *phstr1, ph_string_t *phstr2) +{ + PH_STRL_P(phstr1) = PH_STRL_P(phstr2); + PH_STRV_P(phstr1) = malloc(sizeof(char) * PH_STRL_P(phstr2)); + memcpy(PH_STRV_P(phstr1), PH_STRV_P(phstr2), PH_STRL_P(phstr2)); +} + int ph_str_eq(ph_string_t *phstr1, ph_string_t *phstr2) { return PH_STRL_P(phstr1) == PH_STRL_P(phstr2) && !strncmp(PH_STRV_P(phstr1), PH_STRV_P(phstr2), PH_STRL_P(phstr2)); diff --git a/src/ph_string.h b/src/ph_string.h index 38bd59808a..8a3dfb67cd 100644 --- a/src/ph_string.h +++ b/src/ph_string.h @@ -32,6 +32,7 @@ typedef struct _ph_string_t { ph_string_t *ph_str_alloc(int len); ph_string_t *ph_str_create(char *s, int len); void ph_str_set(ph_string_t *phstr, char *s, int len); +void ph_str_copy(ph_string_t *phstr1, ph_string_t *phstr2); int ph_str_eq(ph_string_t *phstr1, ph_string_t *phstr2); void ph_str_value_free(ph_string_t *phstr); void ph_str_free(ph_string_t *phstr);