// Author: Pete Kirkham // string match benchmark - block io, strncmp string match, fork/receive parallelism. #include #include #include #include #include #include #define BLOCK_SIZE 8096 #define CHUNK_SIZE 2 #define CHUNK_DATA 3 // naive string search size_t string_search (const char* pattern, size_t pattern_length, const char* string, size_t string_length) { const char first(pattern[0]); const size_t last_index(string_length - pattern_length); for (size_t index(0); index < last_index; ++index) if ((first == string[index]) && (strncmp(pattern, string + index, pattern_length) == 0)) return index; return string_length; } // process a line of input inline void process_line (size_t& matches, const char* pattern, size_t pattern_length, const char* line, size_t line_length) { // index of start of match // all lines start with x.x.x.x - - [DD/MMM/YYY:HH:MM:SS -0700] "GET // so we could start 34 chars into the line size_t index(string_search(pattern, pattern_length, line, line_length) + pattern_length); while (index < line_length) { char ch(line[index]); if ((ch == '.') || (ch == '\n')) break; if (ch == ' ') { ++matches; break; } ++index; } } // split chunk into lines and process inline void process_chunk(size_t& matches, const char* pattern, size_t pattern_length, const char* buf, size_t chunk_size) { for (size_t i(0), line_start(0); i < chunk_size; ++i) { if (buf[i] == '\n') { size_t line_length(i - line_start); const char* line(buf + line_start); process_line(matches, pattern, pattern_length, line, line_length); line_start = i+1; } } } // main int main (int narg, char** argvp) { int nprocs; int procid; MPI_Status stat; MPI_Init(&narg, &argvp); MPI_Comm_size(MPI_COMM_WORLD,&nprocs); MPI_Comm_rank(MPI_COMM_WORLD,&procid); if (narg < 2) { std::cout << "The name of the input file is required." << std::endl; return 1; } // pattern to find const char* pattern("GET /ongoing/When/"); size_t pattern_length(strlen(pattern)); // count of matches size_t matches(0); // buffer for data char buf[BLOCK_SIZE*2]; // process 0 loads and splits file, scatters to match processes // then reduces counts if (procid == 0) { // open the file int file(open(argvp[1], O_RDONLY)); if (!file) { std::cout << "failed to open " << argvp[1] << std::endl; return 2; } // block IO // for splitting lines, we move any trailing part line to the start of // memory then read data into the area following it ssize_t bytes_read; size_t bytes_carried(0); int proc(0); while ((bytes_read = read(file, buf + bytes_carried, BLOCK_SIZE)) > 0) { size_t chunk_size(bytes_carried + bytes_read); size_t last_line(0); // scan for last line break for (size_t i(chunk_size - 1); i > 0; --i) { if (buf[i] == '\n') { last_line = i + 1; break; } } // send chunk to other processes // the MPI built-in scatter would require you have the whole file // in memory before sending, and would split into equal segments if (nprocs > 1) { int worker_proc((++proc % (nprocs-1)) + 1); MPI_Send(&last_line, 1, MPI_UNSIGNED_LONG, worker_proc, CHUNK_SIZE, MPI_COMM_WORLD); MPI_Send(buf, last_line, MPI_CHAR, worker_proc, CHUNK_DATA, MPI_COMM_WORLD); } else { process_chunk(matches, pattern, pattern_length, buf, chunk_size); } // copy trailing data to start of alternate block // assumes that a line is never more than half a block long bytes_carried = chunk_size - last_line; memcpy(buf, buf + last_line, bytes_carried); } // if there is any last line, process that in this process if (bytes_carried) { process_line(matches, pattern, pattern_length, buf, bytes_carried); } // tell the other processes we're done by sending a zero size chunk // to each size_t chunk_done(0); for (int worker_proc(1); worker_proc < nprocs; ++worker_proc) { MPI_Send(&chunk_done, 1, MPI_UNSIGNED_LONG, worker_proc, CHUNK_SIZE, MPI_COMM_WORLD); } close(file); } else // other processes receive chunks and report matches { for (;;) { // receive chunk size size_t chunk_size(0); MPI_Recv(&chunk_size, 1, MPI_UNSIGNED_LONG, 0, CHUNK_SIZE, MPI_COMM_WORLD, &stat); if (chunk_size) { // receive chunk data MPI_Recv(buf, chunk_size, MPI_CHAR, 0, CHUNK_DATA, MPI_COMM_WORLD, &stat); process_chunk(matches, pattern, pattern_length, buf, chunk_size); continue; } break; } } size_t total(0); // reduce matches from sibling processes MPI_Reduce(&matches, &total, 1, MPI_UNSIGNED_LONG, MPI_SUM, 0, MPI_COMM_WORLD); std::cout << "matches: " << matches << " <- " << procid << std::endl; if (procid == 0) { std::cout << "total: " << total<< std::endl; } MPI_Finalize(); return 0; }