登峰小蚁 2020-05-11
SparkSession spark = SparkSession .Builder() .AppName("Apache User Log Processing") .GetOrCreate();
DataFrame generalDf = spark.Read().Text("<path to input data set>");
string s_apacheRx = "^(\S+) (\S+) (\S+) [([\w:/]+\s[+-]\d{4})] \"(\S+) (\S+) (\S+)\" (\d{3}) (\d+)";
我们如何对DataFrame的每一行执行计算,比如将每个日志条目与上面的s_apacheRx进行匹配?答案是Spark SQL。
spark.Udf().Register<string, bool>("GeneralReg", log => Regex.IsMatch(log, s_apacheRx));
DataFrame generalDf = spark.Sql("SELECT logs.value, GeneralReg(logs.value) FROM Logs");
generalDf = generalDf.Filter(generalDf["GeneralReg(value)"]); generalDf.Show();
// Choose valid log entries that start with 10 spark.Udf().Register<string, bool>( "IPReg", log => Regex.IsMatch(log, "^(?=10)")); generalDf.CreateOrReplaceTempView("IPLogs"); // Apply UDF to get valid log entries starting with 10 DataFrame ipDf = spark.Sql( "SELECT iplogs.value FROM IPLogs WHERE IPReg(iplogs.value)"); ipDf.Show(); // Choose valid log entries that start with 10 and deal with spam spark.Udf().Register<string, bool>( "SpamRegEx", log => Regex.IsMatch(log, "\\b(?=spam)\\b")); ipDf.CreateOrReplaceTempView("SpamLogs"); // Apply UDF to get valid, start with 10, spam entries DataFrame spamDF = spark.Sql( "SELECT spamlogs.value FROM SpamLogs WHERE SpamRegEx(spamlogs.value)");
int numGetRequests = spamDF .Collect() .Where(r => ContainsGet(r.GetAs<string>("value"))) .Count();
// Use regex matching to group data // Each group matches a column in our log schema // i.e. first group = first column = IP public static bool ContainsGet(string logLine) { Match match = Regex.Match(logLine, s_apacheRx); // Determine if valid log entry is a GET request if (match.Success) { Console.WriteLine("Full log entry: ‘{0}‘", match.Groups[0].Value); // 5th column/group in schema is "method" if (match.Groups[5].Value == "GET") { return true; } } return false; }
spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local /path/to/microsoft-spark-<version>.jar dotnet /path/to/netcoreapp<version>/LoggingApp.dll
var mySqlQuery = "SELECT * FROM table WHERE id = " + std_name;
每次执行这一条查询的时候返回的结果都可能会不一样,这取决于std_name的值。